ATLAS Offline Software
Loading...
Searching...
No Matches
transform.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
9
10import argparse
11import copy
12import os
13import os.path as path
14import re
15import sys
16import traceback
17
18import logging
19msg = logging.getLogger(__name__)
20
21import PyJobTransforms.trfValidation as trfValidation
22import PyJobTransforms.trfExceptions as trfExceptions
23import PyJobTransforms.trfMPITools as trfMPITools
24
25from PyJobTransforms.trfSignal import setTrfSignalHandlers, resetTrfSignalHandlers
26from PyJobTransforms.trfArgs import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
27from PyJobTransforms.trfLogger import setRootLoggerLevel, stdLogLevels
28from PyJobTransforms.trfArgClasses import trfArgParser, argFile, argHISTFile, argument
29from PyJobTransforms.trfExeStepTools import executorStepSuffix, getTotalExecutorSteps
30from PyJobTransforms.trfExitCodes import trfExit
31from PyJobTransforms.trfUtils import shQuoteStrings, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
32from PyJobTransforms.trfUtils import isInteractiveEnv, calcCpuTime, calcWallTime
33from PyJobTransforms.trfReports import trfJobReport, defaultFileReport
34from PyJobTransforms.trfExe import transformExecutor
35from PyJobTransforms.trfGraph import executorGraph
36
37
41
42
47 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug('Welcome to ATLAS job transforms')
51
52 ## @brief Get transform starting timestamp as early as possible
53 self._transformStart = os.times()
54 msg.debug('transformStart time is {0}'.format(self._transformStart))
55
56 self._inFileValidationStart = None
57 self._inFileValidationStop = None
58 self._outFileValidationStart = None
59 self._outFileValidationStop = None
60
61 ## @brief Get trf pre-data as early as possible
62 self._trfPredata = os.environ.get('TRF_PREDATA')
63
64 ## Transform _name
65 self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
66
67 ## @note Holder for arguments this trf understands
68 # Use @c argparse.SUPPRESS to have non-given arguments unset, rather than None
69 # Support reading arguments from a file using the notation @c @file
70 self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars='@')
73
74 if standardTrfArgs:
75 addStandardTrfArgs(self.parser)
76
77 if standardValidationArgs:
78 addValidationArguments(self.parser)
79 addFileValidationArguments(self.parser)
80
81
82 ## Argument dictionary for this transform
83 self._argdict = dict()
84
85 ## Dsta dictionary place holder (this maps data types to their argFile instances)
86 self._dataDictionary = dict()
87
88
89 # Transform executor list - initalise with an empty set
90 self._executors = set()
91 self._executorDictionary = {}
92
93 # Append the given executors or a default one to the set:
94 if executor is not None:
95 self.appendToExecutorSet(executor or {transformExecutor()})
96
97 ## Transform exit code/message holders
98 self._exitCode = None
99 self._exitMsg = None
100
101 ## Report object for this transform
102 self._report = trfJobReport(parentTrf = self)
103
104 ## Transform processed events
105 self._processedEvents = None
106
107 # Setup standard signal handling if asked
108 if standardSignalHandlers:
109 setTrfSignalHandlers(self._exitWithReport)
110 msg.debug('Standard signal handlers established')
111
112
113 @property
114 def name(self):
115 return self._name
116
117 @property
118 def exitCode(self):
119 if self._exitCode is None:
120 msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode('TRF_UNKNOWN')
122 else:
123 return self._exitCode
124
125 @property
126 def exitMsg(self):
127 if self._exitMsg is None:
128 msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
129 return ''
130 else:
131 return self._exitMsg
132
133 @property
134 def argdict(self):
135 return self._argdict
136
137 @property
138 def dataDictionary(self):
139 return self._dataDictionary
140
141 @property
142 def report(self):
143 return self._report
144
145 @property
146 def transformStart(self):
147 return self._transformStart
148
149 @property
150 def transformSetupCpuTime(self):
151 transformSetupCpuTime = None
152 if self._transformStart and self._inFileValidationStart:
153 transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
154
155 return transformSetupCpuTime
156
157 @property
158 def transformSetupWallTime(self):
159 transformSetupWallTime = None
160 if self._transformStart and self._inFileValidationStart:
161 transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
162
163 return transformSetupWallTime
164
165 @property
166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime = None
168 if self._inFileValidationStart and self._inFileValidationStop:
169 inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170
171 return inFileValidationCpuTime
172
173 @property
174 def inFileValidationWallTime(self):
175 inFileValidationWallTime = None
176 if self._inFileValidationStart and self._inFileValidationStop:
177 inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178
179 return inFileValidationWallTime
180
181 @property
182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime = None
184 if self._outFileValidationStart and self._outFileValidationStop:
185 outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186
187 return outFileValidationCpuTime
188
189 @property
190 def outFileValidationWallTime(self):
191 outFileValidationWallTime = None
192 if self._outFileValidationStart and self._outFileValidationStop:
193 outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194
195 return outFileValidationWallTime
196
197 @property
198 def outFileValidationStop(self):
199 return self._outFileValidationStop
200
201 @property
202 def trfPredata(self):
203 return self._trfPredata
204
205 @property
206 def executors(self):
207 return self._executors
208
209 @property
210 def processedEvents(self):
211 return self._processedEvents
212
213 def getProcessedEvents(self):
214 nEvts = None
215 for executionStep in self._executorPath:
216 executor = self._executorDictionary[executionStep['name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
219 return nEvts
220
221 def appendToExecutorSet(self, executors):
222 # Normalise to something iterable
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
228
229 # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230 # Executor book keeping: set parent link back to me for all executors
231 # Also setup a dictionary, indexed by executor name and check that name is unique
232 ## Setting conf here not working - too early to get the dataDictionary
233 for executor in executors:
234 executor.trf = self
235 if executor.name in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.format(executor.name))
239 self._executors.add(executor)
240 self._executorDictionary[executor.name] = executor
241
242
243 ## @brief Parse command line arguments for a transform
244 def parseCmdLineArgs(self, args):
245 msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
246
247 try:
248 # Use the argparse infrastructure to get the actual command line arguments
249 self._argdict=vars(self.parser.parse_args(args))
250
251 # Need to know if any input or output files were set - if so then we suppress the
252 # corresponding parameters from AMI
253 inputFiles = outputFiles = False
254 for k, v in self._argdict.items():
255 if k.startswith('input') and isinstance(v, argFile):
256 inputFiles = True
257 elif k.startswith('output') and isinstance(v, argFile):
258 outputFiles = True
259 msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
260
261 # Now look for special arguments, which expand out to other parameters
262 # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263 # (However, we defend the real command line against updates from either source)
264 extraParameters = {}
265 # AMI configuration?
266 if 'AMIConfig' in self._argdict:
267 msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268 from PyJobTransforms.trfAMI import TagInfo
269 tag=TagInfo(self._argdict['AMIConfig'].value)
270 updateDict = {}
271 for k, v in dict(tag.trfs[0]).items():
272 # Convert to correct internal key form
273 k = cliToKey(k)
274 if inputFiles and k.startswith('input'):
275 msg.debug('Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.format(k))
277 continue
278 if outputFiles and k.startswith('output'):
279 msg.debug('Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.format(k))
281 continue
282 updateDict[k] = v
283 extraParameters.update(updateDict)
284
285 # JSON arguments?
286 if 'argJSON' in self._argdict:
287 try:
288 import json
289 msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290 argfile = open(self._argdict['argJSON'], 'r')
291 jsonParams = json.load(argfile)
292 msg.debug('Read: {0}'.format(jsonParams))
293 extraParameters.update(convertToStr(jsonParams))
294 argfile.close()
295 except Exception as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
297
298 # Event Service
299 if 'eventService' in self._argdict and self._argdict['eventService'].value:
300 updateDict = {}
301 updateDict['athenaMPMergeTargetSize'] = '*:0'
302 updateDict['checkEventCount'] = False
303 updateDict['outputFileValidation'] = False
304 extraParameters.update(updateDict)
305
306 # Process anything we found
307 # List of command line arguments
308 argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309 for k,v in extraParameters.items():
310 msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311 if k not in self.parser._argClass and k not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313 # Check if it is an alias
314 if k in self.parser._argAlias:
315 msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
317 # Check if argument has already been set on the command line
318 if k in argsList:
319 msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
320 continue
321 # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
324 else:
325 self._argdict[k] = v
326 msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
327
328 # Set the key name as an argument property - useful to be able to look bask at where this
329 # argument came from
330 for k, v in self._argdict.items():
331 if isinstance(v, argument):
332 v.name = k
333 elif isinstance(v, list):
334 for it in v:
335 if isinstance(it, argument):
336 it.name = k
337
338 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339 if 'dumpPickle' in self._argdict:
340 msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341 pickledDump(self._argdict)
342 sys.exit(0)
343
344 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345 if 'dumpJSON' in self._argdict:
346 msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347 JSONDump(self._argdict)
348 sys.exit(0)
349
350 except trfExceptions.TransformArgException as e:
351 msg.critical('Argument parsing failure: {0!s}'.format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast = True
355 self.generateReport()
356 sys.exit(self._exitCode)
357
358 except trfExceptions.TransformAMIException as e:
359 msg.critical('AMI failure: {0!s}'.format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
363
364 self.setGlobalLogLevel()
365
366
367 ## @brief Check transform argument dictionary and set the correct root logger option
368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
370 setRootLoggerLevel(stdLogLevels['DEBUG'])
371 elif 'loglevel' in self._argdict:
372 if self._argdict['loglevel'] in stdLogLevels:
373 msg.info("Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375 setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
376 else:
377 msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
378
379
380 ## @brief Execute transform
381 # @details This function calls the actual transform execution class and
382 # sets \c self.exitCode, \c self.exitMsg and \c self.processedEvents transform data members.
383 # @return None.
384 def execute(self):
385 msg.debug('Entering transform execution phase')
386
387 try:
388 # Intercept a few special options here
389 if 'dumpargs' in self._argdict:
390 self.parser.dumpArgs()
391 sys.exit(0)
392
393 # Graph stuff!
394 msg.info('Resolving execution graph')
395 self._setupGraph()
396
397 if 'showSteps' in self._argdict:
398 for exe in self._executors:
399 print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
400 if msg.level <= logging.DEBUG:
401 print(" {0} -> {1}".format(exe.inData, exe.outData))
402 sys.exit(0)
403
404 if 'showGraph' in self._argdict:
405 print(self._executorGraph)
406 sys.exit(0)
407
408 # Graph stuff!
409 msg.info('Starting to trace execution path')
410 self._tracePath()
411 msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
412 ' '.join([exe['name'] for exe in self._executorPath])))
413
414 if 'showPath' in self._argdict:
415 msg.debug('Execution path list is: {0}'.format(self._executorPath))
416 # Now print it nice
417 print('Executor path is:')
418 for node in self._executorPath:
419 print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
420 sys.exit(0)
421
422 msg.debug('Execution path is {0}'.format(self._executorPath))
423
424 # Prepare files for execution (separate method?)
425 for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
426 if dataType in self._dataDictionary:
427 msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
428 else:
429 fileName = 'tmp.' + dataType
430 # How to pick the correct argFile class?
431 for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
432 stdArgName = prefix + dataType + suffix
433 if stdArgName in self.parser._argClass:
434 msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
435 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
436 self._dataDictionary[dataType].io = 'temporary'
437 break
438 if dataType not in self._dataDictionary:
439 if 'HIST' in fileName:
440 self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
441
442 else:
443 self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
444 msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
445 self._dataDictionary[dataType].name = fileName
446
447 # Do splitting if required
448 self.setupSplitting()
449
450 # Error if more than one executor in MPI mode
451 if 'mpi' in self._argdict:
452 if len(self._executorPath) > 1:
453 msg.error("MPI mode is not supported for jobs with more than one execution step!")
454 msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
455 sys.exit(1)
456
457 # Now we can set the final executor configuration properly, with the final dataDictionary
458 for executor in self._executors:
459 executor.conf.setFromTransform(self)
460
461 self.validateInFiles()
462
463 for executionStep in self._executorPath:
464 msg.debug('Now preparing to execute {0}'.format(executionStep))
465 executor = self._executorDictionary[executionStep['name']]
466 executor.preExecute(input = executionStep['input'], output = executionStep['output'])
467 try:
468 executor.execute()
469 executor.postExecute()
470 finally:
471 # Swap out the output files for the version with [] lists expanded
472 if 'mpi' in self._argdict:
473 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
474 self._dataDictionary = new_data_dict
475 executor.conf._dataDictionary = new_data_dict
476 executor.validate()
477
478 self._processedEvents = self.getProcessedEvents()
479 self.validateOutFiles()
480
481 msg.debug('Transform executor succeeded')
482 self._exitCode = 0
483 self._exitMsg = trfExit.codeToName(self._exitCode)
484
485 except trfExceptions.TransformNeedCheckException as e:
486 msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
487 self._exitCode = e.errCode
488 self._exitMsg = e.errMsg
489 self.generateReport(fast=False)
490
491 except trfExceptions.TransformException as e:
492 msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
493 self._exitCode = e.errCode
494 self._exitMsg = e.errMsg
495 # Try and write a job report...
496 self.generateReport(fast=True)
497
498 finally:
499 # Clean up any orphaned processes and exit here if things went bad
500 infanticide(message=True)
501 if self._exitCode:
502 msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
503 sys.exit(self._exitCode)
504
505 ## @brief Setup the executor graph
506 # @note This function might need to be called again when the number of 'substeps' is unknown
507 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
508 # steps they need to run until after digitisation.
509 def _setupGraph(self):
510 # Get input/output data
511 self._inputData = list()
512 self._outputData = list()
513
514 for key, value in self._argdict.items():
515 # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
516 m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
517 # N.B. Protect against taking argunents which are not type argFile
518 if m:
519 if isinstance(value, argFile):
520 if m.group(1) == 'input':
521 self._inputData.append(m.group(2))
522 else:
523 self._outputData.append(m.group(2))
524 self._dataDictionary[m.group(2)] = value
525 elif isinstance(value, list) and value and isinstance(value[0], argFile):
526 if m.group(1) == 'input':
527 self._inputData.append(m.group(2))
528 else:
529 self._outputData.append(m.group(2))
530 self._dataDictionary[m.group(2)] = value
531
532 ## @note If we have no real data then add the pseudo datatype NULL, which allows us to manage
533 # transforms which can run without data
534 if len(self._inputData) == 0:
535 self._inputData.append('inNULL')
536 if len(self._outputData) == 0:
537 self._outputData.append('outNULL')
538 msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
539
540 # Now see if we have any steering - manipulate the substep inputs and outputs before we
541 # setup the graph
542 if 'steering' in self._argdict:
543 msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
544 self._doSteering()
545
546 # Setup the graph and topo sort it
547 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
548 self._executorGraph.doToposort()
549
550 ## @brief Setup executor splitting
551 def setupSplitting(self):
552 if 'splitConfig' not in self._argdict:
553 return
554
555 split = []
556 for executionStep in self._executorPath:
557 baseStepName = executionStep['name']
558 if baseStepName in split:
559 continue
560
561 baseExecutor = self._executorDictionary[baseStepName]
562 splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
563 if splitting <= 1:
564 continue
565
566 msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
567 index = self._executorPath.index(executionStep)
568 baseStep = self._executorPath.pop(index)
569 for i in range(splitting):
570 name = baseStepName + executorStepSuffix + str(i)
571 step = copy.deepcopy(baseStep)
572 step['name'] = name
573 self._executorPath.insert(index + i, step)
574 executor = copy.deepcopy(baseExecutor)
575 executor.name = name
576 executor.conf.executorStep = i
577 executor.conf.totalExecutorSteps = splitting
578 self._executors.add(executor)
579 self._executorDictionary[name] = executor
580 split.append(name)
581
582 ## @brief Trace the path through the executor graph
583 # @note This function might need to be called again when the number of 'substeps' is unknown
584 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
585 # steps they need to run until after digitisation.
586 def _tracePath(self):
587 self._executorGraph.findExecutionPath()
588
589 self._executorPath = self._executorGraph.execution
590 if len(self._executorPath) == 0:
591 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
592 'Execution path finding resulted in no substeps being executed'
593 '(Did you correctly specify input data for this transform?)')
594 # Tell the first executor that they are the first
595 self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
596
597 ## @brief Setup steering, which manipulates the graph before we trace the path
598 # for this transform
599 # @param steeringDict Manual steering dictionary (if specified, used instead of the
600 # steering from the @c steering argument - pay attention to the input structure!
601 def _doSteering(self, steeringDict = None):
602 if not steeringDict:
603 steeringDict = self._argdict['steering'].value
604 for substep, steeringValues in steeringDict.items():
605 foundSubstep = False
606 for executor in self._executors:
607 if executor.name == substep or executor.substep == substep:
608 foundSubstep = True
609 msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
610 # Steering consists of tuples with (in/out, +/-, datatype)
611 for steeringValue in steeringValues:
612 if steeringValue[0] == 'in':
613 startSet = executor.inData
614 else:
615 startSet = executor.outData
616 origLen = len(startSet)
617 msg.debug('Data values to be modified are: {0}'.format(startSet))
618 if steeringValue[1] == '+':
619 startSet.add(steeringValue[2])
620 if len(startSet) != origLen + 1:
621 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
622 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
623 else:
624 startSet.discard(steeringValue[2])
625 if len(startSet) != origLen - 1:
626 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
627 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
628 msg.debug('Updated data values to: {0}'.format(startSet))
629 if not foundSubstep:
630 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
631 'This transform has no executor/substep {0}'.format(substep))
632
633
634 ## @brief Return the last executor which actually executed
635 # @return Last executor which has @c _hasExecuted == @c True, or the very first executor if we didn't even start yet
636 @property
637 def lastExecuted(self):
638 # Just make sure we have the path traced
639 if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
640 return None
641
642 lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
643 for executorStep in self._executorPath[1:]:
644 if self._executorDictionary[executorStep['name']].hasExecuted:
645 lastExecutor = self._executorDictionary[executorStep['name']]
646 return lastExecutor
647
648
649 ## @brief Transform report generator
650 # @param fast If True ensure that no external calls are made for file metadata (this is
651 # used to generate reports in a hurry after a crash or a forced exit)
652 # @param fileReport Dictionary giving the type of report to make for each type of file.
653 # This dictionary has to have all io types as keys and valid values are:
654 # @c None - skip this io type; @c 'full' - Provide all details; @c 'name' - only dataset and
655 # filename will be reported on.
656 # @param reportType Iterable with report types to generate, otherwise a sensible default
657 # is used (~everything, plus the Tier0 report at Tier0)
658 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659 msg.debug('Transform report generator')
660 if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
661 msg.debug("Not in rank 0 -- not generating reports")
662 return
663
664 if 'reportType' in self._argdict:
665 if reportType is not None:
666 msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
667 reportType = self._argdict['reportType'].value
668
669 if reportType is None:
670 reportType = ['json', ]
671 # Only generate the Tier0 report at Tier0 ;-)
672 # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
673 if 'TZHOME' in os.environ:
674 reportType.append('gpickle')
675
676 if not isInteractiveEnv():
677 reportType.append('text')
678 msg.debug('Detected Non-Interactive environment. Enabled text report')
679
680 if 'reportName' in self._argdict:
681 baseName = classicName = self._argdict['reportName'].value
682 else:
683 baseName = 'jobReport'
684 classicName = 'metadata'
685
686 try:
687 # Text: Writes environment variables and machine report in text format.
688 if reportType is None or 'text' in reportType:
689 envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
690 self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
691 # JSON
692 if reportType is None or 'json' in reportType:
693 self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
694 # Classic XML
695 if reportType is None or 'classic' in reportType:
696 self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
697 # Classic gPickle
698 if reportType is None or 'gpickle' in reportType:
699 self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
700 # Pickled version of the JSON report for pilot
701 if reportType is None or 'pilotPickle' in reportType:
702 self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
703
704 except trfExceptions.TransformTimeoutException as reportException:
705 msg.error('Received timeout when writing report ({0})'.format(reportException))
706 msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707 if ('orphanKiller' in self._argdict):
708 infanticide(message=True, listOrphans=True)
709 else:
710 infanticide(message=True)
711 sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
712
713 except trfExceptions.TransformException as reportException:
714 # This is a bad one!
715 msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
716 msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
717 msg.critical('Job reports are likely to be missing or incomplete - sorry')
718 msg.critical('Please report this as a transforms bug!')
719 msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
720 msg.critical('Now exiting with a transform internal error code')
721 if ('orphanKiller' in self._argdict):
722 infanticide(message=True, listOrphans=True)
723 else:
724 infanticide(message=True)
725 sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
726
727
728 # Description stolen from old trfs...
729 ## @brief Common signal handler.
730 # @details This function is installed in place of the default signal handler and attempts to terminate the
731 # transform gracefully. When a signal is caught by the transform, the stdout from the running application process
732 # (i.e. @c athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve
733 # the traceback) before the associated job report records the fact that a signal has been caught and complete
734 # the report accordingly.
735 # @param signum Signal number. Not used since this is a common handle assigned to predefined signals using the
736 # @c _installSignalHandlers(). This param is still required to satisfy the requirements of @c signal.signal().
737 # @param frame Not used. Provided here to satisfy the requirements of @c signal.signal().
738 # @return Does not return. Raises SystemExit exception.
739 # @exception SystemExit()
740 def _exitWithReport(self, signum, frame):
741 msg.critical('Transform received signal {0}'.format(signum))
742 msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
743 self._exitCode = 128+signum
744 self._exitMsg = 'Transform received signal {0}'.format(signum)
745
746 # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
747 resetTrfSignalHandlers()
748
749 msg.critical('Attempting to write reports with known information...')
750 self.generateReport(fast=True)
751 if ('orphanKiller' in self._argdict):
752 infanticide(message=True, listOrphans=True)
753 else:
754 infanticide(message=True)
755
756 sys.exit(self._exitCode)
757
758 ## @brief Setter for transform's validation dictionary
759 # @details This function updates the validation dictionary for the transform,
760 # updating values which are passed in the \c newValidationOptions argument.
761 # @param newValidationOptions Dictionary (or tuples) to update validation
762 # dictionary with
763 # @return None
764 def updateValidationDict(self, newValidationOptions):
765 self.validation.update(newValidationOptions)
766
767 ## @brief Getter function for transform validation dictionary
768 # @return Validiation dictionary
769 def getValidationDict(self):
770 return self.validation
771
772 ## @brief Getter for a specific validation option
773 # @param key Validation dictionary key
774 # @return Valdiation key value or @c None if this key is absent
775 def getValidationOption(self, key):
776 if key in self.validation:
777 return self.validation[key]
778 else:
779 return None
780
781 ## @brief Return a list of fileArgs used by the transform
782 # @param \c io Filter files by io attribute
783 # @return List of argFile instances
784 def getFiles(self, io = None):
785 res = []
786 msg.debug('Looking for file arguments matching: io={0}'.format(io))
787 for argName, arg in self._argdict.items():
788 if isinstance(arg, argFile):
789 msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
790 if io is not None and arg.io != io:
791 continue
792 msg.debug('Argument {0} matches criteria'.format(argName))
793 res.append(arg)
794 return res
795
796
797 def validateInFiles(self):
798 if self._inFileValidationStart is None:
799 self._inFileValidationStart = os.times()
800 msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
801
802 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
803 ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
804 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
805 ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
806 ):
807 msg.info('Standard input file validation turned off for transform %s.', self.name)
808 else:
809 msg.info('Validating input files')
810 if 'parallelFileValidation' in self._argdict:
811 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
812 else:
813 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
814
815 self._inFileValidationStop = os.times()
816 msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
817
818 def validateOutFiles(self):
819 if self._outFileValidationStart is None:
820 self._outFileValidationStart = os.times()
821 msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
822
823 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
824 ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
825 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
826 ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
827 ):
828 msg.info('Standard output file validation turned off for transform %s.', self.name)
829 elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
830 msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
831 else:
832 msg.info('Validating output files')
833 parparallelMode = False
834 # Make MT file validation default
835 parmultithreadedMode = True
836 if 'parallelFileValidation' in self._argdict:
837 parparallelMode = self._argdict['parallelFileValidation'].value
838 if 'multithreadedFileValidation' in self._argdict:
839 parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
840 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
841
842 self._outFileValidationStop = os.times()
843 msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))
Core transform class.
Definition transform.py:40
__init__(self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
Initialise a job transform.
Definition transform.py:48
Transform argument class definitions.
Transform execution functions.
Module for transform exit codes.
Transform graph utilities.
Logging configuration for ATLAS job transforms.
Utilities for handling MPI-Athena jobs.
Transform report classes and helper functions.
Signal handling utilities for ATLAS job transforms.
Transform utility functions.
Validation control for job transforms.