19msg = logging.getLogger(__name__)
22import PyJobTransforms.trfExceptions
as trfExceptions
26from PyJobTransforms.trfArgs
import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
29from PyJobTransforms.trfExeStepTools
import executorStepSuffix, getTotalExecutorSteps
47 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug('Welcome to ATLAS job transforms')
52 ## @brief Get transform starting timestamp as early as possible
53 self._transformStart = os.times()
54 msg.debug('transformStart time is {0}'.format(self._transformStart))
56 self._inFileValidationStart = None
57 self._inFileValidationStop = None
58 self._outFileValidationStart = None
59 self._outFileValidationStop = None
61 ## @brief Get trf pre-data as early as possible
62 self._trfPredata = os.environ.get('TRF_PREDATA')
65 self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
67 ## @note Holder for arguments this trf understands
68 # Use @c argparse.SUPPRESS to have non-given arguments unset, rather than None
69 # Support reading arguments from a file using the notation @c @file
70 self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars='@')
75 addStandardTrfArgs(self.parser)
77 if standardValidationArgs:
78 addValidationArguments(self.parser)
79 addFileValidationArguments(self.parser)
82 ## Argument dictionary for this transform
83 self._argdict = dict()
85 ## Dsta dictionary place holder (this maps data types to their argFile instances)
86 self._dataDictionary = dict()
89 # Transform executor list - initalise with an empty set
90 self._executors = set()
91 self._executorDictionary = {}
93 # Append the given executors or a default one to the set:
94 if executor is not None:
95 self.appendToExecutorSet(executor or {transformExecutor()})
97 ## Transform exit code/message holders
101 ## Report object for this transform
102 self._report = trfJobReport(parentTrf = self)
104 ## Transform processed events
105 self._processedEvents = None
107 # Setup standard signal handling if asked
108 if standardSignalHandlers:
109 setTrfSignalHandlers(self._exitWithReport)
110 msg.debug('Standard signal handlers established')
119 if self._exitCode is None:
120 msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode('TRF_UNKNOWN')
123 return self._exitCode
127 if self._exitMsg is None:
128 msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
138 def dataDictionary(self):
139 return self._dataDictionary
146 def transformStart(self):
147 return self._transformStart
150 def transformSetupCpuTime(self):
151 transformSetupCpuTime = None
152 if self._transformStart and self._inFileValidationStart:
153 transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
155 return transformSetupCpuTime
158 def transformSetupWallTime(self):
159 transformSetupWallTime = None
160 if self._transformStart and self._inFileValidationStart:
161 transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
163 return transformSetupWallTime
166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime = None
168 if self._inFileValidationStart and self._inFileValidationStop:
169 inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
171 return inFileValidationCpuTime
174 def inFileValidationWallTime(self):
175 inFileValidationWallTime = None
176 if self._inFileValidationStart and self._inFileValidationStop:
177 inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
179 return inFileValidationWallTime
182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime = None
184 if self._outFileValidationStart and self._outFileValidationStop:
185 outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
187 return outFileValidationCpuTime
190 def outFileValidationWallTime(self):
191 outFileValidationWallTime = None
192 if self._outFileValidationStart and self._outFileValidationStop:
193 outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
195 return outFileValidationWallTime
198 def outFileValidationStop(self):
199 return self._outFileValidationStop
202 def trfPredata(self):
203 return self._trfPredata
207 return self._executors
210 def processedEvents(self):
211 return self._processedEvents
213 def getProcessedEvents(self):
215 for executionStep in self._executorPath:
216 executor = self._executorDictionary[executionStep['name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
221 def appendToExecutorSet(self, executors):
222 # Normalise to something iterable
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
229 # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230 # Executor book keeping: set parent link back to me for all executors
231 # Also setup a dictionary, indexed by executor name and check that name is unique
232 ## Setting conf here not working - too early to get the dataDictionary
233 for executor in executors:
235 if executor.name in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.format(executor.name))
239 self._executors.add(executor)
240 self._executorDictionary[executor.name] = executor
243 ## @brief Parse command line arguments for a transform
244 def parseCmdLineArgs(self, args):
245 msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
248 # Use the argparse infrastructure to get the actual command line arguments
249 self._argdict=vars(self.parser.parse_args(args))
251 # Need to know if any input or output files were set - if so then we suppress the
252 # corresponding parameters from AMI
253 inputFiles = outputFiles = False
254 for k, v in self._argdict.items():
255 if k.startswith('input') and isinstance(v, argFile):
257 elif k.startswith('output') and isinstance(v, argFile):
259 msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
261 # Now look for special arguments, which expand out to other parameters
262 # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263 # (However, we defend the real command line against updates from either source)
266 if 'AMIConfig' in self._argdict:
267 msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268 from PyJobTransforms.trfAMI import TagInfo
269 tag=TagInfo(self._argdict['AMIConfig'].value)
271 for k, v in dict(tag.trfs[0]).items():
272 # Convert to correct internal key form
274 if inputFiles and k.startswith('input'):
275 msg.debug('Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.format(k))
278 if outputFiles and k.startswith('output'):
279 msg.debug('Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.format(k))
283 extraParameters.update(updateDict)
286 if 'argJSON' in self._argdict:
289 msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290 argfile = open(self._argdict['argJSON'], 'r')
291 jsonParams = json.load(argfile)
292 msg.debug('Read: {0}'.format(jsonParams))
293 extraParameters.update(convertToStr(jsonParams))
295 except Exception as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
299 if 'eventService' in self._argdict and self._argdict['eventService'].value:
301 updateDict['athenaMPMergeTargetSize'] = '*:0'
302 updateDict['checkEventCount'] = False
303 updateDict['outputFileValidation'] = False
304 extraParameters.update(updateDict)
306 # Process anything we found
307 # List of command line arguments
308 argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309 for k,v in extraParameters.items():
310 msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311 if k not in self.parser._argClass and k not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313 # Check if it is an alias
314 if k in self.parser._argAlias:
315 msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
317 # Check if argument has already been set on the command line
319 msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
321 # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
326 msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
328 # Set the key name as an argument property - useful to be able to look bask at where this
330 for k, v in self._argdict.items():
331 if isinstance(v, argument):
333 elif isinstance(v, list):
335 if isinstance(it, argument):
338 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339 if 'dumpPickle' in self._argdict:
340 msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341 pickledDump(self._argdict)
344 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345 if 'dumpJSON' in self._argdict:
346 msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347 JSONDump(self._argdict)
350 except trfExceptions.TransformArgException as e:
351 msg.critical('Argument parsing failure: {0!s}'.format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast = True
355 self.generateReport()
356 sys.exit(self._exitCode)
358 except trfExceptions.TransformAMIException as e:
359 msg.critical('AMI failure: {0!s}'.format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
364 self.setGlobalLogLevel()
367 ## @brief Check transform argument dictionary and set the correct root logger option
368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
370 setRootLoggerLevel(stdLogLevels['DEBUG'])
371 elif 'loglevel' in self._argdict:
372 if self._argdict['loglevel'] in stdLogLevels:
373 msg.info("Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375 setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
377 msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
380 ## @brief Execute transform
381 # @details This function calls the actual transform execution class and
382 # sets \c self.exitCode, \c self.exitMsg and \c self.processedEvents transform data members.
385 msg.debug('Entering transform execution phase')
388 # Intercept a few special options here
389 if 'dumpargs' in self._argdict:
390 self.parser.dumpArgs()
394 msg.info('Resolving execution graph')
397 if 'showSteps' in self._argdict:
398 for exe in self._executors:
399 print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
400 if msg.level <= logging.DEBUG:
401 print(" {0} -> {1}".format(exe.inData, exe.outData))
404 if 'showGraph' in self._argdict:
405 print(self._executorGraph)
409 msg.info('Starting to trace execution path')
411 msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
412 ' '.join([exe['name'] for exe in self._executorPath])))
414 if 'showPath' in self._argdict:
415 msg.debug('Execution path list is: {0}'.format(self._executorPath))
417 print('Executor path is:')
418 for node in self._executorPath:
419 print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
422 msg.debug('Execution path is {0}'.format(self._executorPath))
424 # Prepare files for execution (separate method?)
425 for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
426 if dataType in self._dataDictionary:
427 msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
429 fileName = 'tmp.' + dataType
430 # How to pick the correct argFile class?
431 for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
432 stdArgName = prefix + dataType + suffix
433 if stdArgName in self.parser._argClass:
434 msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
435 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
436 self._dataDictionary[dataType].io = 'temporary'
438 if dataType not in self._dataDictionary:
439 if 'HIST' in fileName:
440 self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
443 self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
444 msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
445 self._dataDictionary[dataType].name = fileName
447 # Do splitting if required
448 self.setupSplitting()
450 # Error if more than one executor in MPI mode
451 if 'mpi' in self._argdict:
452 if len(self._executorPath) > 1:
453 msg.error("MPI mode is not supported for jobs with more than one execution step!")
454 msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
457 # Now we can set the final executor configuration properly, with the final dataDictionary
458 for executor in self._executors:
459 executor.conf.setFromTransform(self)
461 self.validateInFiles()
463 for executionStep in self._executorPath:
464 msg.debug('Now preparing to execute {0}'.format(executionStep))
465 executor = self._executorDictionary[executionStep['name']]
466 executor.preExecute(input = executionStep['input'], output = executionStep['output'])
469 executor.postExecute()
471 # Swap out the output files for the version with [] lists expanded
472 if 'mpi' in self._argdict:
473 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
474 self._dataDictionary = new_data_dict
475 executor.conf._dataDictionary = new_data_dict
478 self._processedEvents = self.getProcessedEvents()
479 self.validateOutFiles()
481 msg.debug('Transform executor succeeded')
483 self._exitMsg = trfExit.codeToName(self._exitCode)
485 except trfExceptions.TransformNeedCheckException as e:
486 msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
487 self._exitCode = e.errCode
488 self._exitMsg = e.errMsg
489 self.generateReport(fast=False)
491 except trfExceptions.TransformException as e:
492 msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
493 self._exitCode = e.errCode
494 self._exitMsg = e.errMsg
495 # Try and write a job report...
496 self.generateReport(fast=True)
499 # Clean up any orphaned processes and exit here if things went bad
500 infanticide(message=True)
502 msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
503 sys.exit(self._exitCode)
505 ## @brief Setup the executor graph
506 # @note This function might need to be called again when the number of 'substeps' is unknown
507 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
508 # steps they need to run until after digitisation.
509 def _setupGraph(self):
510 # Get input/output data
511 self._inputData = list()
512 self._outputData = list()
514 for key, value in self._argdict.items():
515 # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
516 m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
517 # N.B. Protect against taking argunents which are not type argFile
519 if isinstance(value, argFile):
520 if m.group(1) == 'input':
521 self._inputData.append(m.group(2))
523 self._outputData.append(m.group(2))
524 self._dataDictionary[m.group(2)] = value
525 elif isinstance(value, list) and value and isinstance(value[0], argFile):
526 if m.group(1) == 'input':
527 self._inputData.append(m.group(2))
529 self._outputData.append(m.group(2))
530 self._dataDictionary[m.group(2)] = value
532 ## @note If we have no real data then add the pseudo datatype NULL, which allows us to manage
533 # transforms which can run without data
534 if len(self._inputData) == 0:
535 self._inputData.append('inNULL')
536 if len(self._outputData) == 0:
537 self._outputData.append('outNULL')
538 msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
540 # Now see if we have any steering - manipulate the substep inputs and outputs before we
542 if 'steering' in self._argdict:
543 msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
546 # Setup the graph and topo sort it
547 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
548 self._executorGraph.doToposort()
550 ## @brief Setup executor splitting
551 def setupSplitting(self):
552 if 'splitConfig' not in self._argdict:
556 for executionStep in self._executorPath:
557 baseStepName = executionStep['name']
558 if baseStepName in split:
561 baseExecutor = self._executorDictionary[baseStepName]
562 splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
566 msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
567 index = self._executorPath.index(executionStep)
568 baseStep = self._executorPath.pop(index)
569 for i in range(splitting):
570 name = baseStepName + executorStepSuffix + str(i)
571 step = copy.deepcopy(baseStep)
573 self._executorPath.insert(index + i, step)
574 executor = copy.deepcopy(baseExecutor)
576 executor.conf.executorStep = i
577 executor.conf.totalExecutorSteps = splitting
578 self._executors.add(executor)
579 self._executorDictionary[name] = executor
582 ## @brief Trace the path through the executor graph
583 # @note This function might need to be called again when the number of 'substeps' is unknown
584 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
585 # steps they need to run until after digitisation.
586 def _tracePath(self):
587 self._executorGraph.findExecutionPath()
589 self._executorPath = self._executorGraph.execution
590 if len(self._executorPath) == 0:
591 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
592 'Execution path finding resulted in no substeps being executed'
593 '(Did you correctly specify input data for this transform?)')
594 # Tell the first executor that they are the first
595 self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
597 ## @brief Setup steering, which manipulates the graph before we trace the path
599 # @param steeringDict Manual steering dictionary (if specified, used instead of the
600 # steering from the @c steering argument - pay attention to the input structure!
601 def _doSteering(self, steeringDict = None):
603 steeringDict = self._argdict['steering'].value
604 for substep, steeringValues in steeringDict.items():
606 for executor in self._executors:
607 if executor.name == substep or executor.substep == substep:
609 msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
610 # Steering consists of tuples with (in/out, +/-, datatype)
611 for steeringValue in steeringValues:
612 if steeringValue[0] == 'in':
613 startSet = executor.inData
615 startSet = executor.outData
616 origLen = len(startSet)
617 msg.debug('Data values to be modified are: {0}'.format(startSet))
618 if steeringValue[1] == '+':
619 startSet.add(steeringValue[2])
620 if len(startSet) != origLen + 1:
621 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
622 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
624 startSet.discard(steeringValue[2])
625 if len(startSet) != origLen - 1:
626 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
627 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
628 msg.debug('Updated data values to: {0}'.format(startSet))
630 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
631 'This transform has no executor/substep {0}'.format(substep))
634 ## @brief Return the last executor which actually executed
635 # @return Last executor which has @c _hasExecuted == @c True, or the very first executor if we didn't even start yet
637 def lastExecuted(self):
638 # Just make sure we have the path traced
639 if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
642 lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
643 for executorStep in self._executorPath[1:]:
644 if self._executorDictionary[executorStep['name']].hasExecuted:
645 lastExecutor = self._executorDictionary[executorStep['name']]
649 ## @brief Transform report generator
650 # @param fast If True ensure that no external calls are made for file metadata (this is
651 # used to generate reports in a hurry after a crash or a forced exit)
652 # @param fileReport Dictionary giving the type of report to make for each type of file.
653 # This dictionary has to have all io types as keys and valid values are:
654 # @c None - skip this io type; @c 'full' - Provide all details; @c 'name' - only dataset and
655 # filename will be reported on.
656 # @param reportType Iterable with report types to generate, otherwise a sensible default
657 # is used (~everything, plus the Tier0 report at Tier0)
658 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659 msg.debug('Transform report generator')
660 if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
661 msg.debug("Not in rank 0 -- not generating reports")
664 if 'reportType' in self._argdict:
665 if reportType is not None:
666 msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
667 reportType = self._argdict['reportType'].value
669 if reportType is None:
670 reportType = ['json', ]
671 # Only generate the Tier0 report at Tier0 ;-)
672 # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
673 if 'TZHOME' in os.environ:
674 reportType.append('gpickle')
676 if not isInteractiveEnv():
677 reportType.append('text')
678 msg.debug('Detected Non-Interactive environment. Enabled text report')
680 if 'reportName' in self._argdict:
681 baseName = classicName = self._argdict['reportName'].value
683 baseName = 'jobReport'
684 classicName = 'metadata'
687 # Text: Writes environment variables and machine report in text format.
688 if reportType is None or 'text' in reportType:
689 envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
690 self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
692 if reportType is None or 'json' in reportType:
693 self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
695 if reportType is None or 'classic' in reportType:
696 self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
698 if reportType is None or 'gpickle' in reportType:
699 self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
700 # Pickled version of the JSON report for pilot
701 if reportType is None or 'pilotPickle' in reportType:
702 self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
704 except trfExceptions.TransformTimeoutException as reportException:
705 msg.error('Received timeout when writing report ({0})'.format(reportException))
706 msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707 if ('orphanKiller' in self._argdict):
708 infanticide(message=True, listOrphans=True)
710 infanticide(message=True)
711 sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
713 except trfExceptions.TransformException as reportException:
715 msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
716 msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
717 msg.critical('Job reports are likely to be missing or incomplete - sorry')
718 msg.critical('Please report this as a transforms bug!')
719 msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
720 msg.critical('Now exiting with a transform internal error code')
721 if ('orphanKiller' in self._argdict):
722 infanticide(message=True, listOrphans=True)
724 infanticide(message=True)
725 sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
728 # Description stolen from old trfs...
729 ## @brief Common signal handler.
730 # @details This function is installed in place of the default signal handler and attempts to terminate the
731 # transform gracefully. When a signal is caught by the transform, the stdout from the running application process
732 # (i.e. @c athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve
733 # the traceback) before the associated job report records the fact that a signal has been caught and complete
734 # the report accordingly.
735 # @param signum Signal number. Not used since this is a common handle assigned to predefined signals using the
736 # @c _installSignalHandlers(). This param is still required to satisfy the requirements of @c signal.signal().
737 # @param frame Not used. Provided here to satisfy the requirements of @c signal.signal().
738 # @return Does not return. Raises SystemExit exception.
739 # @exception SystemExit()
740 def _exitWithReport(self, signum, frame):
741 msg.critical('Transform received signal {0}'.format(signum))
742 msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
743 self._exitCode = 128+signum
744 self._exitMsg = 'Transform received signal {0}'.format(signum)
746 # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
747 resetTrfSignalHandlers()
749 msg.critical('Attempting to write reports with known information...')
750 self.generateReport(fast=True)
751 if ('orphanKiller' in self._argdict):
752 infanticide(message=True, listOrphans=True)
754 infanticide(message=True)
756 sys.exit(self._exitCode)
758 ## @brief Setter for transform's validation dictionary
759 # @details This function updates the validation dictionary for the transform,
760 # updating values which are passed in the \c newValidationOptions argument.
761 # @param newValidationOptions Dictionary (or tuples) to update validation
764 def updateValidationDict(self, newValidationOptions):
765 self.validation.update(newValidationOptions)
767 ## @brief Getter function for transform validation dictionary
768 # @return Validiation dictionary
769 def getValidationDict(self):
770 return self.validation
772 ## @brief Getter for a specific validation option
773 # @param key Validation dictionary key
774 # @return Valdiation key value or @c None if this key is absent
775 def getValidationOption(self, key):
776 if key in self.validation:
777 return self.validation[key]
781 ## @brief Return a list of fileArgs used by the transform
782 # @param \c io Filter files by io attribute
783 # @return List of argFile instances
784 def getFiles(self, io = None):
786 msg.debug('Looking for file arguments matching: io={0}'.format(io))
787 for argName, arg in self._argdict.items():
788 if isinstance(arg, argFile):
789 msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
790 if io is not None and arg.io != io:
792 msg.debug('Argument {0} matches criteria'.format(argName))
797 def validateInFiles(self):
798 if self._inFileValidationStart is None:
799 self._inFileValidationStart = os.times()
800 msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
802 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
803 ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
804 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
805 ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
807 msg.info('Standard input file validation turned off for transform %s.', self.name)
809 msg.info('Validating input files')
810 if 'parallelFileValidation' in self._argdict:
811 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
813 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
815 self._inFileValidationStop = os.times()
816 msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
818 def validateOutFiles(self):
819 if self._outFileValidationStart is None:
820 self._outFileValidationStart = os.times()
821 msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
823 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
824 ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
825 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
826 ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
828 msg.info('Standard output file validation turned off for transform %s.', self.name)
829 elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
830 msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
832 msg.info('Validating output files')
833 parparallelMode = False
834 # Make MT file validation default
835 parmultithreadedMode = True
836 if 'parallelFileValidation' in self._argdict:
837 parparallelMode = self._argdict['parallelFileValidation'].value
838 if 'multithreadedFileValidation' in self._argdict:
839 parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
840 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
842 self._outFileValidationStop = os.times()
843 msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))