|
ATLAS Offline Software
|
Core transform class.
More...
|
def | __init__ (self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='') |
| Initialise a job transform. More...
|
|
def | name (self) |
|
def | exitCode (self) |
|
def | exitMsg (self) |
|
def | argdict (self) |
|
def | dataDictionary (self) |
|
def | report (self) |
|
def | transformStart (self) |
|
def | transformSetupCpuTime (self) |
|
def | transformSetupWallTime (self) |
|
def | inFileValidationCpuTime (self) |
|
def | inFileValidationWallTime (self) |
|
def | outFileValidationCpuTime (self) |
|
def | outFileValidationWallTime (self) |
|
def | outFileValidationStop (self) |
|
def | trfPredata (self) |
|
def | executors (self) |
|
def | processedEvents (self) |
|
def | getProcessedEvents (self) |
|
def | appendToExecutorSet (self, executors) |
|
def | parseCmdLineArgs (self, args) |
| Parse command line arguments for a transform. More...
|
|
def | setGlobalLogLevel (self) |
| Check transform argument dictionary and set the correct root logger option. More...
|
|
def | execute (self) |
| Execute transform. More...
|
|
def | setupSplitting (self) |
| Setup executor splitting. More...
|
|
def | lastExecuted (self) |
| Return the last executor which actually executed. More...
|
|
def | generateReport (self, reportType=None, fast=False, fileReport=defaultFileReport) |
| Transform report generator. More...
|
|
def | updateValidationDict (self, newValidationOptions) |
| Setter for transform's validation dictionary. More...
|
|
def | getValidationDict (self) |
| Getter function for transform validation dictionary. More...
|
|
def | getValidationOption (self, key) |
| Getter for a specific validation option. More...
|
|
def | getFiles (self, io=None) |
| Return a list of fileArgs used by the transform. More...
|
|
def | validateInFiles (self) |
|
def | validateOutFiles (self) |
|
Core transform class.
- Note
- Every transform should only have one transform class instantiated
Definition at line 39 of file transform.py.
◆ __init__()
def python.transform.transform.__init__ |
( |
|
self, |
|
|
|
standardSignalHandlers = True , |
|
|
|
standardTrfArgs = True , |
|
|
|
standardValidationArgs = True , |
|
|
|
trfName = None , |
|
|
|
executor = None , |
|
|
|
exeArgs = None , |
|
|
|
description = '' |
|
) |
| |
Initialise a job transform.
- Parameters
-
standardSignalHandlers | Boolean to set signal handlers. Default True . |
standardValidationArgs | Boolean to set standard validation options. Default True . |
trfName | Name of the transform. Default is executable name with .py rstripped. |
executor | Executor list Transform class initialiser |
Definition at line 46 of file transform.py.
46 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
47 trfName = None, executor = None, exeArgs = None, description = ''):
48 '''Transform class initialiser'''
49 msg.debug(
'Welcome to ATLAS job transforms')
52 self._transformStart = os.times()
53 msg.debug(
'transformStart time is {0}'.
format(self._transformStart))
55 self._inFileValidationStart =
None
56 self._inFileValidationStop =
None
57 self._outFileValidationStart =
None
58 self._outFileValidationStop =
None
61 self._trfPredata = os.environ.get(
'TRF_PREDATA')
64 self._name = trfName
or path.basename(sys.argv[0]).rsplit(
'.py', 1)[0]
69 self.parser = trfArgParser(description=
'Transform {0}. {1}'.
format(self.name, description),
70 argument_default=argparse.SUPPRESS,
71 fromfile_prefix_chars=
'@')
76 if standardValidationArgs:
82 self._argdict = dict()
85 self._dataDictionary = dict()
89 self._executors =
set()
90 self._executorDictionary = {}
93 if executor
is not None:
94 self.appendToExecutorSet(executor
or {transformExecutor()})
101 self._report = trfJobReport(parentTrf = self)
104 self._processedEvents =
None
107 if standardSignalHandlers:
109 msg.debug(
'Standard signal handlers established')
◆ _doSteering()
def python.transform.transform._doSteering |
( |
|
self, |
|
|
|
steeringDict = None |
|
) |
| |
|
private |
Setup steering, which manipulates the graph before we trace the path for this transform.
- Parameters
-
steeringDict | Manual steering dictionary (if specified, used instead of the steering from the steering argument - pay attention to the input structure! |
Definition at line 588 of file transform.py.
588 def _doSteering(self, steeringDict = None):
590 steeringDict = self._argdict[
'steering'].value
591 for substep, steeringValues
in steeringDict.items():
593 for executor
in self._executors:
594 if executor.name == substep
or executor.substep == substep:
596 msg.debug(
'Updating {0} with {1}'.
format(executor.name, steeringValues))
598 for steeringValue
in steeringValues:
599 if steeringValue[0] ==
'in':
600 startSet = executor.inData
602 startSet = executor.outData
603 origLen = len(startSet)
604 msg.debug(
'Data values to be modified are: {0}'.
format(startSet))
605 if steeringValue[1] ==
'+':
606 startSet.add(steeringValue[2])
607 if len(startSet) != origLen + 1:
608 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
609 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
611 startSet.discard(steeringValue[2])
612 if len(startSet) != origLen - 1:
613 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
614 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
615 msg.debug(
'Updated data values to: {0}'.
format(startSet))
617 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_GRAPH_STEERING_ERROR'),
618 'This transform has no executor/substep {0}'.
format(substep))
◆ _exitWithReport()
def python.transform.transform._exitWithReport |
( |
|
self, |
|
|
|
signum, |
|
|
|
frame |
|
) |
| |
|
private |
Common signal handler.
This function is installed in place of the default signal handler and attempts to terminate the transform gracefully. When a signal is caught by the transform, the stdout from the running application process (i.e. athena.py
) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve the traceback) before the associated job report records the fact that a signal has been caught and complete the report accordingly.
- Parameters
-
signum | Signal number. Not used since this is a common handle assigned to predefined signals using the _installSignalHandlers() . This param is still required to satisfy the requirements of signal.signal() . |
frame | Not used. Provided here to satisfy the requirements of signal.signal() . |
- Returns
- Does not return. Raises SystemExit exception.
- Exceptions
-
Definition at line 724 of file transform.py.
724 def _exitWithReport(self, signum, frame):
725 msg.critical(
'Transform received signal {0}'.
format(signum))
726 msg.critical(
'Stack trace now follows:\n{0!s}'.
format(
''.
join(traceback.format_stack(frame))))
727 self._exitCode = 128+signum
728 self._exitMsg =
'Transform received signal {0}'.
format(signum)
733 msg.critical(
'Attempting to write reports with known information...')
734 self.generateReport(fast=
True)
735 if (
'orphanKiller' in self._argdict):
740 sys.exit(self._exitCode)
◆ _setupGraph()
def python.transform.transform._setupGraph |
( |
|
self | ) |
|
|
private |
Setup the executor graph.
- Note
- This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.
Definition at line 496 of file transform.py.
496 def _setupGraph(self):
498 self._inputData =
list()
499 self._outputData =
list()
501 for key, value
in self._argdict.
items():
503 m = re.match(
r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
506 if isinstance(value, argFile):
507 if m.group(1) ==
'input':
508 self._inputData.
append(m.group(2))
510 self._outputData.
append(m.group(2))
511 self._dataDictionary[m.group(2)] = value
512 elif isinstance(value, list)
and value
and isinstance(value[0], argFile):
513 if m.group(1) ==
'input':
514 self._inputData.
append(m.group(2))
516 self._outputData.
append(m.group(2))
517 self._dataDictionary[m.group(2)] = value
521 if len(self._inputData) == 0:
522 self._inputData.
append(
'inNULL')
523 if len(self._outputData) == 0:
524 self._outputData.
append(
'outNULL')
525 msg.debug(
'Transform has this input data: {0}; output data {1}'.
format(self._inputData, self._outputData))
529 if 'steering' in self._argdict:
530 msg.debug(
'Now applying steering to graph: {0}'.
format(self._argdict[
'steering'].value))
534 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
535 self._executorGraph.doToposort()
◆ _tracePath()
def python.transform.transform._tracePath |
( |
|
self | ) |
|
|
private |
Trace the path through the executor graph.
- Note
- This function might need to be called again when the number of 'substeps' is unknown just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD steps they need to run until after digitisation.
Definition at line 573 of file transform.py.
573 def _tracePath(self):
574 self._executorGraph.findExecutionPath()
576 self._executorPath = self._executorGraph.execution
577 if len(self._executorPath) == 0:
578 raise trfExceptions.TransformSetupException(trfExit.nameToCode(
'TRF_SETUP'),
579 'Execution path finding resulted in no substeps being executed'
580 '(Did you correctly specify input data for this transform?)')
582 self._executorDictionary[self._executorPath[0][
'name']].conf.firstExecutor =
True
◆ appendToExecutorSet()
def python.transform.transform.appendToExecutorSet |
( |
|
self, |
|
|
|
executors |
|
) |
| |
Definition at line 220 of file transform.py.
220 def appendToExecutorSet(self, executors):
222 if isinstance(executors, transformExecutor):
223 executors = [executors,]
224 elif not isinstance(executors, (list, tuple, set)):
225 raise trfExceptions.TransformInternalException(trfExit.nameToCode(
'TRF_INTERNAL'),
226 'Transform was initialised with an executor which was not a simple executor or an executor set')
232 for executor
in executors:
234 if executor.name
in self._executorDictionary:
235 raise trfExceptions.TransformInternalException(trfExit.nameToCode(
'TRF_INTERNAL'),
236 'Transform has been initialised with two executors with the same name ({0})'
237 ' - executor names must be unique'.
format(executor.name))
238 self._executors.
add(executor)
239 self._executorDictionary[executor.name] = executor
◆ argdict()
def python.transform.transform.argdict |
( |
|
self | ) |
|
◆ dataDictionary()
def python.transform.transform.dataDictionary |
( |
|
self | ) |
|
Definition at line 137 of file transform.py.
137 def dataDictionary(self):
138 return self._dataDictionary
◆ execute()
def python.transform.transform.execute |
( |
|
self | ) |
|
Execute transform.
This function calls the actual transform execution class and sets self.exitCode
, self.exitMsg
and self.processedEvents
transform data members.
- Returns
- None.
Definition at line 383 of file transform.py.
384 msg.debug(
'Entering transform execution phase')
388 if 'dumpargs' in self._argdict:
389 self.parser.dumpArgs()
393 msg.info(
'Resolving execution graph')
396 if 'showSteps' in self._argdict:
397 for exe
in self._executors:
398 print(
"Executor Step: {0} (alias {1})".
format(exe.name, exe.substep))
399 if msg.level <= logging.DEBUG:
400 print(
" {0} -> {1}".
format(exe.inData, exe.outData))
403 if 'showGraph' in self._argdict:
404 print(self._executorGraph)
408 msg.info(
'Starting to trace execution path')
410 msg.info(
'Execution path found with {0} step(s): {1}'.
format(len(self._executorPath),
411 ' '.
join([exe[
'name']
for exe
in self._executorPath])))
413 if 'showPath' in self._argdict:
414 msg.debug(
'Execution path list is: {0}'.
format(self._executorPath))
416 print(
'Executor path is:')
417 for node
in self._executorPath:
421 msg.debug(
'Execution path is {0}'.
format(self._executorPath))
424 for dataType
in [ data
for data
in self._executorGraph.data
if 'NULL' not in data ]:
425 if dataType
in self._dataDictionary:
426 msg.debug(
'Data type {0} maps to existing argument {1}'.
format(dataType, self._dataDictionary[dataType]))
428 fileName =
'tmp.' + dataType
430 for (prefix, suffix)
in ((
'tmp',
''), (
'output',
'File'), (
'input',
'File')):
431 stdArgName = prefix + dataType + suffix
432 if stdArgName
in self.parser._argClass:
433 msg.debug(
'Matched data type {0} to argument {1}'.
format(dataType, stdArgName))
434 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
435 self._dataDictionary[dataType].io =
'temporary'
437 if dataType
not in self._dataDictionary:
438 if 'HIST' in fileName:
439 self._dataDictionary[dataType] = argHISTFile(fileName, io=
'temporary', type=dataType.lower())
442 self._dataDictionary[dataType] = argFile(fileName, io=
'temporary', type=dataType.lower())
443 msg.debug(
'Did not find any argument matching data type {0} - setting to plain argFile: {1}'.
format(dataType, self._dataDictionary[dataType]))
444 self._dataDictionary[dataType].name = fileName
447 self.setupSplitting()
450 for executor
in self._executors:
451 executor.conf.setFromTransform(self)
453 self.validateInFiles()
455 for executionStep
in self._executorPath:
456 msg.debug(
'Now preparing to execute {0}'.
format(executionStep))
457 executor = self._executorDictionary[executionStep[
'name']]
458 executor.preExecute(input = executionStep[
'input'], output = executionStep[
'output'])
461 executor.postExecute()
465 self._processedEvents = self.getProcessedEvents()
466 self.validateOutFiles()
468 msg.debug(
'Transform executor succeeded')
470 self._exitMsg = trfExit.codeToName(self._exitCode)
472 except trfExceptions.TransformNeedCheckException
as e:
473 msg.warning(
'Transform executor signaled NEEDCHECK condition: {0}'.
format(e.errMsg))
474 self._exitCode = e.errCode
475 self._exitMsg = e.errMsg
476 self.generateReport(fast=
False)
478 except trfExceptions.TransformException
as e:
479 msg.critical(
'Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
480 self._exitCode = e.errCode
481 self._exitMsg = e.errMsg
483 self.generateReport(fast=
True)
489 msg.warning(
'Transform now exiting early with exit code {0} ({1})'.
format(self._exitCode, self._exitMsg))
490 sys.exit(self._exitCode)
◆ executors()
def python.transform.transform.executors |
( |
|
self | ) |
|
◆ exitCode()
def python.transform.transform.exitCode |
( |
|
self | ) |
|
Definition at line 117 of file transform.py.
118 if self._exitCode
is None:
119 msg.warning(
'Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
120 return trfExit.nameToCode(
'TRF_UNKNOWN')
122 return self._exitCode
◆ exitMsg()
def python.transform.transform.exitMsg |
( |
|
self | ) |
|
Definition at line 125 of file transform.py.
126 if self._exitMsg
is None:
127 msg.warning(
'Transform exit message getter: _exitMsg is unset, returning empty string')
◆ generateReport()
def python.transform.transform.generateReport |
( |
|
self, |
|
|
|
reportType = None , |
|
|
|
fast = False , |
|
|
|
fileReport = defaultFileReport |
|
) |
| |
Transform report generator.
- Parameters
-
fast | If True ensure that no external calls are made for file metadata (this is used to generate reports in a hurry after a crash or a forced exit) |
fileReport | Dictionary giving the type of report to make for each type of file. This dictionary has to have all io types as keys and valid values are: None - skip this io type; 'full' - Provide all details; 'name' - only dataset and filename will be reported on. |
reportType | Iterable with report types to generate, otherwise a sensible default is used (~everything, plus the Tier0 report at Tier0) |
Definition at line 645 of file transform.py.
645 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
646 msg.debug(
'Transform report generator')
648 if 'reportType' in self._argdict:
649 if reportType
is not None:
650 msg.info(
'Transform requested report types {0} overridden by command line to {1}'.
format(reportType, self._argdict[
'reportType'].value))
651 reportType = self._argdict[
'reportType'].value
653 if reportType
is None:
654 reportType = [
'json', ]
657 if 'TZHOME' in os.environ:
658 reportType.append(
'gpickle')
661 reportType.append(
'text')
662 msg.debug(
'Detected Non-Interactive environment. Enabled text report')
664 if 'reportName' in self._argdict:
665 baseName = classicName = self._argdict[
'reportName'].value
667 baseName =
'jobReport'
668 classicName =
'metadata'
672 if reportType
is None or 'text' in reportType:
673 envName = baseName
if 'reportName' in self._argdict
else 'env'
674 self._report.writeTxtReport(filename=
'{0}.txt'.
format(envName), fast=fast, fileReport=fileReport)
676 if reportType
is None or 'json' in reportType:
677 self._report.writeJSONReport(filename=
'{0}.json'.
format(baseName), fast=fast, fileReport=fileReport)
679 if reportType
is None or 'classic' in reportType:
680 self._report.writeClassicXMLReport(filename=
'{0}.xml'.
format(classicName), fast=fast)
682 if reportType
is None or 'gpickle' in reportType:
683 self._report.writeGPickleReport(filename=
'{0}.gpickle'.
format(baseName), fast=fast)
685 if reportType
is None or 'pilotPickle' in reportType:
686 self._report.writePilotPickleReport(filename=
'{0}Extract.pickle'.
format(baseName), fast=fast, fileReport=fileReport)
688 except trfExceptions.TransformTimeoutException
as reportException:
689 msg.error(
'Received timeout when writing report ({0})'.
format(reportException))
690 msg.error(
'Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
691 if (
'orphanKiller' in self._argdict):
695 sys.exit(trfExit.nameToCode(
'TRF_METADATA_CALL_FAIL'))
697 except trfExceptions.TransformException
as reportException:
699 msg.critical(
'Attempt to write job report failed with exception {0!s}: {1!s}'.
format(reportException.__class__.__name__, reportException))
700 msg.critical(
'Stack trace now follows:\n{0}'.
format(traceback.format_exc()))
701 msg.critical(
'Job reports are likely to be missing or incomplete - sorry')
702 msg.critical(
'Please report this as a transforms bug!')
703 msg.critical(
'Before calling the report generator the transform status was: {0}; exit code {1}'.
format(self._exitMsg, self._exitCode))
704 msg.critical(
'Now exiting with a transform internal error code')
705 if (
'orphanKiller' in self._argdict):
709 sys.exit(trfExit.nameToCode(
'TRF_INTERNAL'))
◆ getFiles()
def python.transform.transform.getFiles |
( |
|
self, |
|
|
|
io = None |
|
) |
| |
Return a list of fileArgs used by the transform.
- Parameters
-
Definition at line 768 of file transform.py.
768 def getFiles(self, io = None):
770 msg.debug(
'Looking for file arguments matching: io={0}'.
format(io))
771 for argName, arg
in self._argdict.
items():
772 if isinstance(arg, argFile):
773 msg.debug(
'Argument {0} is argFile type ({1!s})'.
format(argName, arg))
774 if io
is not None and arg.io != io:
776 msg.debug(
'Argument {0} matches criteria'.
format(argName))
◆ getProcessedEvents()
def python.transform.transform.getProcessedEvents |
( |
|
self | ) |
|
Definition at line 212 of file transform.py.
212 def getProcessedEvents(self):
214 for executionStep
in self._executorPath:
215 executor = self._executorDictionary[executionStep[
'name']]
216 if executor.conf.firstExecutor:
217 nEvts = executor.eventCount
◆ getValidationDict()
def python.transform.transform.getValidationDict |
( |
|
self | ) |
|
Getter function for transform validation dictionary.
- Returns
- Validiation dictionary
Definition at line 753 of file transform.py.
753 def getValidationDict(self):
754 return self.validation
◆ getValidationOption()
def python.transform.transform.getValidationOption |
( |
|
self, |
|
|
|
key |
|
) |
| |
Getter for a specific validation option.
- Parameters
-
key | Validation dictionary key |
- Returns
- Valdiation key value or
None
if this key is absent
Definition at line 759 of file transform.py.
759 def getValidationOption(self, key):
760 if key
in self.validation:
761 return self.validation[key]
◆ inFileValidationCpuTime()
def python.transform.transform.inFileValidationCpuTime |
( |
|
self | ) |
|
Definition at line 165 of file transform.py.
165 def inFileValidationCpuTime(self):
166 inFileValidationCpuTime =
None
167 if self._inFileValidationStart
and self._inFileValidationStop:
168 inFileValidationCpuTime =
calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170 return inFileValidationCpuTime
◆ inFileValidationWallTime()
def python.transform.transform.inFileValidationWallTime |
( |
|
self | ) |
|
Definition at line 173 of file transform.py.
173 def inFileValidationWallTime(self):
174 inFileValidationWallTime =
None
175 if self._inFileValidationStart
and self._inFileValidationStop:
176 inFileValidationWallTime =
calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178 return inFileValidationWallTime
◆ lastExecuted()
def python.transform.transform.lastExecuted |
( |
|
self | ) |
|
Return the last executor which actually executed.
- Returns
- Last executor which has
_hasExecuted
== True
, or the very first executor if we didn't even start yet
Definition at line 624 of file transform.py.
624 def lastExecuted(self):
626 if not hasattr(self,
'_executorPath')
or len(self._executorPath) == 0:
629 lastExecutor = self._executorDictionary[self._executorPath[0][
'name']]
630 for executorStep
in self._executorPath[1:]:
631 if self._executorDictionary[executorStep[
'name']].hasExecuted:
632 lastExecutor = self._executorDictionary[executorStep[
'name']]
◆ name()
def python.transform.transform.name |
( |
|
self | ) |
|
◆ outFileValidationCpuTime()
def python.transform.transform.outFileValidationCpuTime |
( |
|
self | ) |
|
Definition at line 181 of file transform.py.
181 def outFileValidationCpuTime(self):
182 outFileValidationCpuTime =
None
183 if self._outFileValidationStart
and self._outFileValidationStop:
184 outFileValidationCpuTime =
calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186 return outFileValidationCpuTime
◆ outFileValidationStop()
def python.transform.transform.outFileValidationStop |
( |
|
self | ) |
|
Definition at line 197 of file transform.py.
197 def outFileValidationStop(self):
198 return self._outFileValidationStop
◆ outFileValidationWallTime()
def python.transform.transform.outFileValidationWallTime |
( |
|
self | ) |
|
Definition at line 189 of file transform.py.
189 def outFileValidationWallTime(self):
190 outFileValidationWallTime =
None
191 if self._outFileValidationStart
and self._outFileValidationStop:
192 outFileValidationWallTime =
calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194 return outFileValidationWallTime
◆ parseCmdLineArgs()
def python.transform.transform.parseCmdLineArgs |
( |
|
self, |
|
|
|
args |
|
) |
| |
Parse command line arguments for a transform.
Definition at line 243 of file transform.py.
243 def parseCmdLineArgs(self, args):
248 self._argdict=vars(self.parser.
parse_args(args))
252 inputFiles = outputFiles =
False
253 for k, v
in self._argdict.
items():
254 if k.startswith(
'input')
and isinstance(v, argFile):
256 elif k.startswith(
'output')
and isinstance(v, argFile):
258 msg.debug(
"CLI Input files: {0}; Output files {1}".
format(inputFiles, outputFiles))
265 if 'AMIConfig' in self._argdict:
266 msg.debug(
'Given AMI tag configuration {0}'.
format(self._argdict[
'AMIConfig']))
268 tag=
TagInfo(self._argdict[
'AMIConfig'].value)
270 for k, v
in dict(tag.trfs[0]).
items():
273 if inputFiles
and k.startswith(
'input'):
274 msg.debug(
'Suppressing argument {0} from AMI'
275 ' because input files have been specified on the command line'.
format(k))
277 if outputFiles
and k.startswith(
'output'):
278 msg.debug(
'Suppressing argument {0} from AMI'
279 ' because output files have been specified on the command line'.
format(k))
282 extraParameters.update(updateDict)
285 if 'argJSON' in self._argdict:
288 msg.debug(
'Given JSON encoded arguments in {0}'.
format(self._argdict[
'argJSON']))
289 argfile =
open(self._argdict[
'argJSON'],
'r')
290 jsonParams = json.load(argfile)
291 msg.debug(
'Read: {0}'.
format(jsonParams))
294 except Exception
as e:
295 raise trfExceptions.TransformArgException(trfExit.nameToCode(
'TRF_ARG_ERROR'),
'Error when deserialising JSON file {0} ({1})'.
format(self._argdict[
'argJSON'], e))
298 if 'eventService' in self._argdict
and self._argdict[
'eventService'].value:
300 updateDict[
'athenaMPMergeTargetSize'] =
'*:0'
301 updateDict[
'checkEventCount'] =
False
302 updateDict[
'outputFileValidation'] =
False
303 extraParameters.update(updateDict)
307 argsList = [ i.split(
"=", 1)[0].lstrip(
'-')
for i
in args
if i.startswith(
'-')]
308 for k,v
in extraParameters.items():
309 msg.debug(
'Found this extra argument: {0} with value: {1} ({2})'.
format(k, v,
type(v)))
310 if k
not in self.parser._argClass
and k
not in self.parser._argAlias:
311 raise trfExceptions.TransformArgException(trfExit.nameToCode(
'TRF_ARG_ERROR'),
'Argument "{0}" not known (try "--help")'.
format(k))
313 if k
in self.parser._argAlias:
314 msg.debug(
'Resolving alias from {0} to {1}'.
format(k, self.parser._argAlias[k]))
315 k = self.parser._argAlias[k]
318 msg.debug(
'Ignored {0}={1} as extra parameter because this argument was given on the command line.'.
format(k, v))
321 if '__call__' in dir(self.parser._argClass[k]):
322 self._argdict[k] = self.parser._argClass[k](v)
325 msg.debug(
'Argument {0} set to {1}'.
format(k, self._argdict[k]))
329 for k, v
in self._argdict.
items():
330 if isinstance(v, argument):
332 elif isinstance(v, list):
334 if isinstance(it, argument):
338 if 'dumpPickle' in self._argdict:
339 msg.info(
'Now dumping pickled version of command line to {0}'.
format(self._argdict[
'dumpPickle']))
344 if 'dumpJSON' in self._argdict:
345 msg.info(
'Now dumping JSON version of command line to {0}'.
format(self._argdict[
'dumpJSON']))
349 except trfExceptions.TransformArgException
as e:
350 msg.critical(
'Argument parsing failure: {0!s}'.
format(e))
351 self._exitCode = e.errCode
352 self._exitMsg = e.errMsg
353 self._report.fast =
True
354 self.generateReport()
355 sys.exit(self._exitCode)
357 except trfExceptions.TransformAMIException
as e:
358 msg.critical(
'AMI failure: {0!s}'.
format(e))
359 self._exitCode = e.errCode
360 self._exitMsg = e.errMsg
361 sys.exit(self._exitCode)
363 self.setGlobalLogLevel()
◆ processedEvents()
def python.transform.transform.processedEvents |
( |
|
self | ) |
|
Definition at line 209 of file transform.py.
209 def processedEvents(self):
210 return self._processedEvents
◆ report()
def python.transform.transform.report |
( |
|
self | ) |
|
◆ setGlobalLogLevel()
def python.transform.transform.setGlobalLogLevel |
( |
|
self | ) |
|
Check transform argument dictionary and set the correct root logger option.
Definition at line 367 of file transform.py.
367 def setGlobalLogLevel(self):
368 if 'verbose' in self._argdict:
370 elif 'loglevel' in self._argdict:
371 if self._argdict[
'loglevel']
in stdLogLevels:
372 msg.info(
"Loglevel option found - setting root logger level to %s",
373 logging.getLevelName(stdLogLevels[self._argdict[
'loglevel']]))
376 msg.warning(
'Unrecognised loglevel ({0}) given - ignored'.
format(self._argdict[
'loglevel']))
◆ setupSplitting()
def python.transform.transform.setupSplitting |
( |
|
self | ) |
|
Setup executor splitting.
Definition at line 538 of file transform.py.
538 def setupSplitting(self):
539 if 'splitConfig' not in self._argdict:
543 for executionStep
in self._executorPath:
544 baseStepName = executionStep[
'name']
545 if baseStepName
in split:
548 baseExecutor = self._executorDictionary[baseStepName]
553 msg.info(
'Splitting {0} into {1} substeps'.
format(executionStep, splitting))
554 index = self._executorPath.
index(executionStep)
555 baseStep = self._executorPath.pop(index)
556 for i
in range(splitting):
557 name = baseStepName + executorStepSuffix +
str(i)
558 step = copy.deepcopy(baseStep)
560 self._executorPath.insert(index + i, step)
561 executor = copy.deepcopy(baseExecutor)
563 executor.conf.executorStep = i
564 executor.conf.totalExecutorSteps = splitting
565 self._executors.
add(executor)
566 self._executorDictionary[name] = executor
◆ transformSetupCpuTime()
def python.transform.transform.transformSetupCpuTime |
( |
|
self | ) |
|
Definition at line 149 of file transform.py.
149 def transformSetupCpuTime(self):
150 transformSetupCpuTime =
None
151 if self._transformStart
and self._inFileValidationStart:
152 transformSetupCpuTime =
calcCpuTime(self._transformStart, self._inFileValidationStart)
154 return transformSetupCpuTime
◆ transformSetupWallTime()
def python.transform.transform.transformSetupWallTime |
( |
|
self | ) |
|
Definition at line 157 of file transform.py.
157 def transformSetupWallTime(self):
158 transformSetupWallTime =
None
159 if self._transformStart
and self._inFileValidationStart:
160 transformSetupWallTime =
calcWallTime(self._transformStart, self._inFileValidationStart)
162 return transformSetupWallTime
◆ transformStart()
def python.transform.transform.transformStart |
( |
|
self | ) |
|
Definition at line 145 of file transform.py.
145 def transformStart(self):
146 return self._transformStart
◆ trfPredata()
def python.transform.transform.trfPredata |
( |
|
self | ) |
|
Definition at line 201 of file transform.py.
201 def trfPredata(self):
202 return self._trfPredata
◆ updateValidationDict()
def python.transform.transform.updateValidationDict |
( |
|
self, |
|
|
|
newValidationOptions |
|
) |
| |
Setter for transform's validation dictionary.
This function updates the validation dictionary for the transform, updating values which are passed in the newValidationOptions
argument.
- Parameters
-
newValidationOptions | Dictionary (or tuples) to update validation dictionary with |
- Returns
- None
Definition at line 748 of file transform.py.
748 def updateValidationDict(self, newValidationOptions):
749 self.validation.
update(newValidationOptions)
◆ validateInFiles()
def python.transform.transform.validateInFiles |
( |
|
self | ) |
|
Definition at line 781 of file transform.py.
781 def validateInFiles(self):
782 if self._inFileValidationStart
is None:
783 self._inFileValidationStart = os.times()
784 msg.debug(
'inFileValidationStart time is {0}'.
format(self._inFileValidationStart))
786 if ((
'skipFileValidation' in self._argdict
and self._argdict[
'skipFileValidation']
is True)
or
787 (
'skipInputFileValidation' in self._argdict
and self._argdict[
'skipInputFileValidation']
is True)
or
788 (
'fileValidation' in self._argdict
and self._argdict[
'fileValidation'].value
is False)
or
789 (
'inputFileValidation' in self._argdict
and self._argdict[
'inputFileValidation'].value
is False)
791 msg.info(
'Standard input file validation turned off for transform %s.', self.name)
793 msg.info(
'Validating input files')
794 if 'parallelFileValidation' in self._argdict:
795 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'input', parallelMode=self._argdict[
'parallelFileValidation'].value )
797 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'input')
799 self._inFileValidationStop = os.times()
800 msg.debug(
'inFileValidationStop time is {0}'.
format(self._inFileValidationStop))
◆ validateOutFiles()
def python.transform.transform.validateOutFiles |
( |
|
self | ) |
|
Definition at line 802 of file transform.py.
802 def validateOutFiles(self):
803 if self._outFileValidationStart
is None:
804 self._outFileValidationStart = os.times()
805 msg.debug(
'outFileValidationStart time is {0}'.
format(self._outFileValidationStart))
807 if ((
'skipFileValidation' in self._argdict
and self._argdict[
'skipFileValidation']
is True)
or
808 (
'skipOutputFileValidation' in self._argdict
and self._argdict[
'skipOutputFileValidation']
is True)
or
809 (
'fileValidation' in self._argdict
and self._argdict[
'fileValidation'].value
is False)
or
810 (
'outputFileValidation' in self._argdict
and self._argdict[
'outputFileValidation'].value
is False)
812 msg.info(
'Standard output file validation turned off for transform %s.', self.name)
814 msg.info(
'Validating output files')
815 parparallelMode =
False
816 parmultithreadedMode =
False
817 if 'parallelFileValidation' in self._argdict:
818 parparallelMode = self._argdict[
'parallelFileValidation'].value
819 if 'multithreadedFileValidation' in self._argdict:
820 parmultithreadedMode = self._argdict[
'multithreadedFileValidation'].value
821 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io=
'output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
823 self._outFileValidationStop = os.times()
824 msg.debug(
'outFileValidationStop time is {0}'.
format(self._outFileValidationStop))
◆ _argdict
python.transform.transform._argdict |
|
private |
Argument dictionary for this transform.
Definition at line 81 of file transform.py.
◆ _dataDictionary
python.transform.transform._dataDictionary |
|
private |
Dsta dictionary place holder (this maps data types to their argFile instances)
Definition at line 84 of file transform.py.
◆ _executorDictionary
python.transform.transform._executorDictionary |
|
private |
◆ _executorGraph
python.transform.transform._executorGraph |
|
private |
- Note
- If we have no real data then add the pseudo datatype NULL, which allows us to manage transforms which can run without data
Definition at line 534 of file transform.py.
◆ _executorPath
python.transform.transform._executorPath |
|
private |
◆ _executors
python.transform.transform._executors |
|
private |
◆ _exitCode
python.transform.transform._exitCode |
|
private |
Transform exit code/message holders.
Definition at line 96 of file transform.py.
◆ _exitMsg
python.transform.transform._exitMsg |
|
private |
◆ _inFileValidationStart
python.transform.transform._inFileValidationStart |
|
private |
◆ _inFileValidationStop
python.transform.transform._inFileValidationStop |
|
private |
◆ _inputData
python.transform.transform._inputData |
|
private |
◆ _name
python.transform.transform._name |
|
private |
◆ _outFileValidationStart
python.transform.transform._outFileValidationStart |
|
private |
◆ _outFileValidationStop
python.transform.transform._outFileValidationStop |
|
private |
◆ _outputData
python.transform.transform._outputData |
|
private |
◆ _processedEvents
python.transform.transform._processedEvents |
|
private |
◆ _report
python.transform.transform._report |
|
private |
Report object for this transform.
Definition at line 100 of file transform.py.
◆ _transformStart
python.transform.transform._transformStart |
|
private |
Get transform starting timestamp as early as possible.
Definition at line 51 of file transform.py.
◆ _trfPredata
python.transform.transform._trfPredata |
|
private |
Get trf pre-data as early as possible.
Definition at line 60 of file transform.py.
◆ parser
python.transform.transform.parser |
The documentation for this class was generated from the following file:
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
def addValidationArguments(parser)
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
def calcCpuTime(start, stop)
def addStandardTrfArgs(parser)
Add standard transform arguments to an argparse ArgumentParser.
def setRootLoggerLevel(level)
Change the loggging level of the root logger.
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
bool add(const std::string &hname, TKey *tobj)
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
std::string join(const std::vector< std::string > &v, const char c=',')
def setTrfSignalHandlers(handler)
Install common handler for various signals.
def calcWallTime(start, stop)
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
def addFileValidationArguments(parser)
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.