19msg = logging.getLogger(__name__)
22import PyJobTransforms.trfExceptions
as trfExceptions
26from PyJobTransforms.trfArgs
import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
29from PyJobTransforms.trfExeStepTools
import executorStepSuffix, getTotalExecutorSteps
47 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug('Welcome to ATLAS job transforms')
52 ## @brief Get transform starting timestamp as early as possible
53 self._transformStart = os.times()
54 msg.debug('transformStart time is {0}'.format(self._transformStart))
56 self._inFileValidationStart = None
57 self._inFileValidationStop = None
58 self._outFileValidationStart = None
59 self._outFileValidationStop = None
61 ## @brief Get trf pre-data as early as possible
62 self._trfPredata = os.environ.get('TRF_PREDATA')
65 self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
67 ## @note Holder for arguments this trf understands
68 # Use @c argparse.SUPPRESS to have non-given arguments unset, rather than None
69 # Support reading arguments from a file using the notation @c @file
70 self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars='@')
75 addStandardTrfArgs(self.parser)
77 if standardValidationArgs:
78 addValidationArguments(self.parser)
79 addFileValidationArguments(self.parser)
82 ## Argument dictionary for this transform
83 self._argdict = dict()
85 ## Dsta dictionary place holder (this maps data types to their argFile instances)
86 self._dataDictionary = dict()
89 # Transform executor list - initalise with an empty set
90 self._executors = set()
91 self._executorDictionary = {}
93 # Append the given executors or a default one to the set:
94 if executor is not None:
95 self.appendToExecutorSet(executor or {transformExecutor()})
97 ## Transform exit code/message holders
101 ## Report object for this transform
102 self._report = trfJobReport(parentTrf = self)
104 ## Transform processed events
105 self._processedEvents = None
107 # Setup standard signal handling if asked
108 if standardSignalHandlers:
109 setTrfSignalHandlers(self._exitWithReport)
110 msg.debug('Standard signal handlers established')
119 if self._exitCode is None:
120 msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode('TRF_UNKNOWN')
123 return self._exitCode
127 if self._exitMsg is None:
128 msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
138 def dataDictionary(self):
139 return self._dataDictionary
146 def transformStart(self):
147 return self._transformStart
150 def transformSetupCpuTime(self):
151 transformSetupCpuTime = None
152 if self._transformStart and self._inFileValidationStart:
153 transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
155 return transformSetupCpuTime
158 def transformSetupWallTime(self):
159 transformSetupWallTime = None
160 if self._transformStart and self._inFileValidationStart:
161 transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
163 return transformSetupWallTime
166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime = None
168 if self._inFileValidationStart and self._inFileValidationStop:
169 inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
171 return inFileValidationCpuTime
174 def inFileValidationWallTime(self):
175 inFileValidationWallTime = None
176 if self._inFileValidationStart and self._inFileValidationStop:
177 inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
179 return inFileValidationWallTime
182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime = None
184 if self._outFileValidationStart and self._outFileValidationStop:
185 outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
187 return outFileValidationCpuTime
190 def outFileValidationWallTime(self):
191 outFileValidationWallTime = None
192 if self._outFileValidationStart and self._outFileValidationStop:
193 outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
195 return outFileValidationWallTime
198 def outFileValidationStop(self):
199 return self._outFileValidationStop
202 def trfPredata(self):
203 return self._trfPredata
207 return self._executors
210 def processedEvents(self):
211 return self._processedEvents
213 def getProcessedEvents(self):
215 for executionStep in self._executorPath:
216 executor = self._executorDictionary[executionStep['name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
221 def appendToExecutorSet(self, executors):
222 # Normalise to something iterable
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
229 # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230 # Executor book keeping: set parent link back to me for all executors
231 # Also setup a dictionary, indexed by executor name and check that name is unique
232 ## Setting conf here not working - too early to get the dataDictionary
233 for executor in executors:
235 if executor.name in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.format(executor.name))
239 self._executors.add(executor)
240 self._executorDictionary[executor.name] = executor
243 ## @brief Parse command line arguments for a transform
244 def parseCmdLineArgs(self, args):
245 msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
248 # Use the argparse infrastructure to get the actual command line arguments
249 self._argdict=vars(self.parser.parse_args(args))
251 # Need to know if any input or output files were set - if so then we suppress the
252 # corresponding parameters from AMI
253 inputFiles = outputFiles = False
254 for k, v in self._argdict.items():
255 if k.startswith('input') and isinstance(v, argFile):
257 elif k.startswith('output') and isinstance(v, argFile):
259 msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
261 # Now look for special arguments, which expand out to other parameters
262 # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263 # (However, we defend the real command line against updates from either source)
266 if 'AMIConfig' in self._argdict:
267 msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268 from PyJobTransforms.trfAMI import TagInfo
269 tag=TagInfo(self._argdict['AMIConfig'].value)
271 for k, v in dict(tag.trfs[0]).items():
272 # Convert to correct internal key form
274 if inputFiles and k.startswith('input'):
275 msg.debug('Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.format(k))
278 if outputFiles and k.startswith('output'):
279 msg.debug('Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.format(k))
283 extraParameters.update(updateDict)
286 if 'argJSON' in self._argdict:
289 msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290 argfile = open(self._argdict['argJSON'], 'r')
291 jsonParams = json.load(argfile)
292 msg.debug('Read: {0}'.format(jsonParams))
293 extraParameters.update(convertToStr(jsonParams))
295 except Exception as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
299 if 'eventService' in self._argdict and self._argdict['eventService'].value:
301 updateDict['athenaMPMergeTargetSize'] = '*:0'
302 updateDict['checkEventCount'] = False
303 updateDict['outputFileValidation'] = False
304 extraParameters.update(updateDict)
306 # Process anything we found
307 # List of command line arguments
308 argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309 for k,v in extraParameters.items():
310 msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311 if k not in self.parser._argClass and k not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313 # Check if it is an alias
314 if k in self.parser._argAlias:
315 msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
317 # Check if argument has already been set on the command line
319 msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
321 # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
326 msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
328 # Set the key name as an argument property - useful to be able to look bask at where this
330 for k, v in self._argdict.items():
331 if isinstance(v, argument):
333 elif isinstance(v, list):
335 if isinstance(it, argument):
338 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339 if 'dumpPickle' in self._argdict:
340 msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341 pickledDump(self._argdict)
344 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345 if 'dumpJSON' in self._argdict:
346 msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347 JSONDump(self._argdict)
350 except trfExceptions.TransformArgException as e:
351 msg.critical('Argument parsing failure: {0!s}'.format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast = True
355 self.generateReport()
356 sys.exit(self._exitCode)
358 except trfExceptions.TransformAMIException as e:
359 msg.critical('AMI failure: {0!s}'.format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
364 self.setGlobalLogLevel()
367 ## @brief Check transform argument dictionary and set the correct root logger option
368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
370 setRootLoggerLevel(stdLogLevels['DEBUG'])
371 elif 'loglevel' in self._argdict:
372 if self._argdict['loglevel'] in stdLogLevels:
373 msg.info("Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375 setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
377 msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
380 ## @brief Execute transform
381 # @details This function calls the actual transform execution class and
382 # sets \c self.exitCode, \c self.exitMsg and \c self.processedEvents transform data members.
385 msg.debug('Entering transform execution phase')
387 #Warn if a CA-based tranform has deprecated command line args
388 #Print warning once per deprecated arg
389 deprecationWarningPrinted = False
390 for exe in self._executors:
391 if isinstance(exe, athenaExecutor):
392 if (not deprecationWarningPrinted) and exe.skeletonCA:
393 toBeRemoved = ["autoConfiguration","trigStream","topOptions","valid"]
394 for deprecatedArg in toBeRemoved:
395 if deprecatedArg in self._argdict:
396 msg.warning("!!!Detected use of "+deprecatedArg+" in command line arguments for CA-based transform!!!")
397 msg.warning(deprecatedArg+" is DEPRECATED and due for removal. Applying it will not do anything, please remove it from your Transform definition.")
398 deprecationWarningPrinted = True
399 msg.debug(deprecatedArg+" detected in executor "+exe.name)
402 # Intercept a few special options here
403 if 'dumpargs' in self._argdict:
404 self.parser.dumpArgs()
408 msg.info('Resolving execution graph')
411 if 'showSteps' in self._argdict:
412 for exe in self._executors:
413 print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
414 if msg.level <= logging.DEBUG:
415 print(" {0} -> {1}".format(exe.inData, exe.outData))
418 if 'showGraph' in self._argdict:
419 print(self._executorGraph)
423 msg.info('Starting to trace execution path')
425 msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
426 ' '.join([exe['name'] for exe in self._executorPath])))
428 if 'showPath' in self._argdict:
429 msg.debug('Execution path list is: {0}'.format(self._executorPath))
431 print('Executor path is:')
432 for node in self._executorPath:
433 print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
436 msg.debug('Execution path is {0}'.format(self._executorPath))
438 # Prepare files for execution (separate method?)
439 for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
440 if dataType in self._dataDictionary:
441 msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
443 fileName = 'tmp.' + dataType
444 # How to pick the correct argFile class?
445 for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
446 stdArgName = prefix + dataType + suffix
447 if stdArgName in self.parser._argClass:
448 msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
449 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
450 self._dataDictionary[dataType].io = 'temporary'
452 if dataType not in self._dataDictionary:
453 if 'HIST' in fileName:
454 self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
457 self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
458 msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
459 self._dataDictionary[dataType].name = fileName
461 # Do splitting if required
462 self.setupSplitting()
464 # Error if more than one executor in MPI mode
465 if 'mpi' in self._argdict:
466 if len(self._executorPath) > 1:
467 msg.error("MPI mode is not supported for jobs with more than one execution step!")
468 msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
471 # Now we can set the final executor configuration properly, with the final dataDictionary
472 for executor in self._executors:
473 executor.conf.setFromTransform(self)
475 self.validateInFiles()
477 for executionStep in self._executorPath:
478 msg.debug('Now preparing to execute {0}'.format(executionStep))
479 executor = self._executorDictionary[executionStep['name']]
480 executor.preExecute(input = executionStep['input'], output = executionStep['output'])
483 executor.postExecute()
485 # Swap out the output files for the version with [] lists expanded
486 if 'mpi' in self._argdict:
487 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
488 self._dataDictionary = new_data_dict
489 executor.conf._dataDictionary = new_data_dict
492 self._processedEvents = self.getProcessedEvents()
493 self.validateOutFiles()
495 msg.debug('Transform executor succeeded')
497 self._exitMsg = trfExit.codeToName(self._exitCode)
499 except trfExceptions.TransformNeedCheckException as e:
500 msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
501 self._exitCode = e.errCode
502 self._exitMsg = e.errMsg
503 self.generateReport(fast=False)
505 except trfExceptions.TransformException as e:
506 msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
507 self._exitCode = e.errCode
508 self._exitMsg = e.errMsg
509 # Try and write a job report...
510 self.generateReport(fast=True)
513 # Clean up any orphaned processes and exit here if things went bad
514 infanticide(message=True)
516 msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
517 sys.exit(self._exitCode)
519 ## @brief Setup the executor graph
520 # @note This function might need to be called again when the number of 'substeps' is unknown
521 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
522 # steps they need to run until after digitisation.
523 def _setupGraph(self):
524 # Get input/output data
525 self._inputData = list()
526 self._outputData = list()
528 for key, value in self._argdict.items():
529 # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
530 m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
531 # N.B. Protect against taking argunents which are not type argFile
533 if isinstance(value, argFile):
534 if m.group(1) == 'input':
535 self._inputData.append(m.group(2))
537 self._outputData.append(m.group(2))
538 self._dataDictionary[m.group(2)] = value
539 elif isinstance(value, list) and value and isinstance(value[0], argFile):
540 if m.group(1) == 'input':
541 self._inputData.append(m.group(2))
543 self._outputData.append(m.group(2))
544 self._dataDictionary[m.group(2)] = value
546 ## @note If we have no real data then add the pseudo datatype NULL, which allows us to manage
547 # transforms which can run without data
548 if len(self._inputData) == 0:
549 self._inputData.append('inNULL')
550 if len(self._outputData) == 0:
551 self._outputData.append('outNULL')
552 msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
554 # Now see if we have any steering - manipulate the substep inputs and outputs before we
556 if 'steering' in self._argdict:
557 msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
560 # Setup the graph and topo sort it
561 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
562 self._executorGraph.doToposort()
564 ## @brief Setup executor splitting
565 def setupSplitting(self):
566 if 'splitConfig' not in self._argdict:
570 for executionStep in self._executorPath:
571 baseStepName = executionStep['name']
572 if baseStepName in split:
575 baseExecutor = self._executorDictionary[baseStepName]
576 splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
580 msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
581 index = self._executorPath.index(executionStep)
582 baseStep = self._executorPath.pop(index)
583 for i in range(splitting):
584 name = baseStepName + executorStepSuffix + str(i)
585 step = copy.deepcopy(baseStep)
587 self._executorPath.insert(index + i, step)
588 executor = copy.deepcopy(baseExecutor)
590 executor.conf.executorStep = i
591 executor.conf.totalExecutorSteps = splitting
592 self._executors.add(executor)
593 self._executorDictionary[name] = executor
596 ## @brief Trace the path through the executor graph
597 # @note This function might need to be called again when the number of 'substeps' is unknown
598 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
599 # steps they need to run until after digitisation.
600 def _tracePath(self):
601 self._executorGraph.findExecutionPath()
603 self._executorPath = self._executorGraph.execution
604 if len(self._executorPath) == 0:
605 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
606 'Execution path finding resulted in no substeps being executed'
607 '(Did you correctly specify input data for this transform?)')
608 # Tell the first executor that they are the first
609 self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
611 ## @brief Setup steering, which manipulates the graph before we trace the path
613 # @param steeringDict Manual steering dictionary (if specified, used instead of the
614 # steering from the @c steering argument - pay attention to the input structure!
615 def _doSteering(self, steeringDict = None):
617 steeringDict = self._argdict['steering'].value
618 for substep, steeringValues in steeringDict.items():
620 for executor in self._executors:
621 if executor.name == substep or executor.substep == substep:
623 msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
624 # Steering consists of tuples with (in/out, +/-, datatype)
625 for steeringValue in steeringValues:
626 if steeringValue[0] == 'in':
627 startSet = executor.inData
629 startSet = executor.outData
630 origLen = len(startSet)
631 msg.debug('Data values to be modified are: {0}'.format(startSet))
632 if steeringValue[1] == '+':
633 startSet.add(steeringValue[2])
634 if len(startSet) != origLen + 1:
635 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
636 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
638 startSet.discard(steeringValue[2])
639 if len(startSet) != origLen - 1:
640 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
641 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
642 msg.debug('Updated data values to: {0}'.format(startSet))
644 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
645 'This transform has no executor/substep {0}'.format(substep))
648 ## @brief Return the last executor which actually executed
649 # @return Last executor which has @c _hasExecuted == @c True, or the very first executor if we didn't even start yet
651 def lastExecuted(self):
652 # Just make sure we have the path traced
653 if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
656 lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
657 for executorStep in self._executorPath[1:]:
658 if self._executorDictionary[executorStep['name']].hasExecuted:
659 lastExecutor = self._executorDictionary[executorStep['name']]
663 ## @brief Transform report generator
664 # @param fast If True ensure that no external calls are made for file metadata (this is
665 # used to generate reports in a hurry after a crash or a forced exit)
666 # @param fileReport Dictionary giving the type of report to make for each type of file.
667 # This dictionary has to have all io types as keys and valid values are:
668 # @c None - skip this io type; @c 'full' - Provide all details; @c 'name' - only dataset and
669 # filename will be reported on.
670 # @param reportType Iterable with report types to generate, otherwise a sensible default
671 # is used (~everything, plus the Tier0 report at Tier0)
672 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
673 msg.debug('Transform report generator')
674 if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
675 msg.debug("Not in rank 0 -- not generating reports")
678 if 'reportType' in self._argdict:
679 if reportType is not None:
680 msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
681 reportType = self._argdict['reportType'].value
683 if reportType is None:
684 reportType = ['json', ]
685 # Only generate the Tier0 report at Tier0 ;-)
686 # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
687 if 'TZHOME' in os.environ:
688 reportType.append('gpickle')
690 if not isInteractiveEnv():
691 reportType.append('text')
692 msg.debug('Detected Non-Interactive environment. Enabled text report')
694 if 'reportName' in self._argdict:
695 baseName = classicName = self._argdict['reportName'].value
697 baseName = 'jobReport'
698 classicName = 'metadata'
701 # Text: Writes environment variables and machine report in text format.
702 if reportType is None or 'text' in reportType:
703 envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
704 self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
706 if reportType is None or 'json' in reportType:
707 self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
709 if reportType is None or 'classic' in reportType:
710 self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
712 if reportType is None or 'gpickle' in reportType:
713 self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
714 # Pickled version of the JSON report for pilot
715 if reportType is None or 'pilotPickle' in reportType:
716 self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
718 except trfExceptions.TransformTimeoutException as reportException:
719 msg.error('Received timeout when writing report ({0})'.format(reportException))
720 msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
721 if ('orphanKiller' in self._argdict):
722 infanticide(message=True, listOrphans=True)
724 infanticide(message=True)
725 sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
727 except trfExceptions.TransformException as reportException:
729 msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
730 msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
731 msg.critical('Job reports are likely to be missing or incomplete - sorry')
732 msg.critical('Please report this as a transforms bug!')
733 msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
734 msg.critical('Now exiting with a transform internal error code')
735 if ('orphanKiller' in self._argdict):
736 infanticide(message=True, listOrphans=True)
738 infanticide(message=True)
739 sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
742 # Description stolen from old trfs...
743 ## @brief Common signal handler.
744 # @details This function is installed in place of the default signal handler and attempts to terminate the
745 # transform gracefully. When a signal is caught by the transform, the stdout from the running application process
746 # (i.e. @c athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve
747 # the traceback) before the associated job report records the fact that a signal has been caught and complete
748 # the report accordingly.
749 # @param signum Signal number. Not used since this is a common handle assigned to predefined signals using the
750 # @c _installSignalHandlers(). This param is still required to satisfy the requirements of @c signal.signal().
751 # @param frame Not used. Provided here to satisfy the requirements of @c signal.signal().
752 # @return Does not return. Raises SystemExit exception.
753 # @exception SystemExit()
754 def _exitWithReport(self, signum, frame):
755 msg.critical('Transform received signal {0}'.format(signum))
756 msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
757 self._exitCode = 128+signum
758 self._exitMsg = 'Transform received signal {0}'.format(signum)
760 # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
761 resetTrfSignalHandlers()
763 msg.critical('Attempting to write reports with known information...')
764 self.generateReport(fast=True)
765 if ('orphanKiller' in self._argdict):
766 infanticide(message=True, listOrphans=True)
768 infanticide(message=True)
770 sys.exit(self._exitCode)
772 ## @brief Setter for transform's validation dictionary
773 # @details This function updates the validation dictionary for the transform,
774 # updating values which are passed in the \c newValidationOptions argument.
775 # @param newValidationOptions Dictionary (or tuples) to update validation
778 def updateValidationDict(self, newValidationOptions):
779 self.validation.update(newValidationOptions)
781 ## @brief Getter function for transform validation dictionary
782 # @return Validiation dictionary
783 def getValidationDict(self):
784 return self.validation
786 ## @brief Getter for a specific validation option
787 # @param key Validation dictionary key
788 # @return Valdiation key value or @c None if this key is absent
789 def getValidationOption(self, key):
790 if key in self.validation:
791 return self.validation[key]
795 ## @brief Return a list of fileArgs used by the transform
796 # @param \c io Filter files by io attribute
797 # @return List of argFile instances
798 def getFiles(self, io = None):
800 msg.debug('Looking for file arguments matching: io={0}'.format(io))
801 for argName, arg in self._argdict.items():
802 if isinstance(arg, argFile):
803 msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
804 if io is not None and arg.io != io:
806 msg.debug('Argument {0} matches criteria'.format(argName))
811 def validateInFiles(self):
812 if self._inFileValidationStart is None:
813 self._inFileValidationStart = os.times()
814 msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
816 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
817 ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
818 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
819 ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
821 msg.info('Standard input file validation turned off for transform %s.', self.name)
823 msg.info('Validating input files')
824 if 'parallelFileValidation' in self._argdict:
825 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
827 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
829 self._inFileValidationStop = os.times()
830 msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
832 def validateOutFiles(self):
833 if self._outFileValidationStart is None:
834 self._outFileValidationStart = os.times()
835 msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
837 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
838 ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
839 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
840 ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
842 msg.info('Standard output file validation turned off for transform %s.', self.name)
843 elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
844 msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
846 msg.info('Validating output files')
847 parparallelMode = False
848 # Make MT file validation default
849 parmultithreadedMode = True
850 if 'parallelFileValidation' in self._argdict:
851 parparallelMode = self._argdict['parallelFileValidation'].value
852 if 'multithreadedFileValidation' in self._argdict:
853 parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
854 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
856 self._outFileValidationStop = os.times()
857 msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))