ATLAS Offline Software
Loading...
Searching...
No Matches
transform.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
9
10import argparse
11import copy
12import os
13import os.path as path
14import re
15import sys
16import traceback
17
18import logging
19msg = logging.getLogger(__name__)
20
21import PyJobTransforms.trfValidation as trfValidation
22import PyJobTransforms.trfExceptions as trfExceptions
23import PyJobTransforms.trfMPITools as trfMPITools
24
25from PyJobTransforms.trfSignal import setTrfSignalHandlers, resetTrfSignalHandlers
26from PyJobTransforms.trfArgs import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
27from PyJobTransforms.trfLogger import setRootLoggerLevel, stdLogLevels
28from PyJobTransforms.trfArgClasses import trfArgParser, argFile, argHISTFile, argument
29from PyJobTransforms.trfExeStepTools import executorStepSuffix, getTotalExecutorSteps
30from PyJobTransforms.trfExitCodes import trfExit
31from PyJobTransforms.trfUtils import shQuoteStrings, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
32from PyJobTransforms.trfUtils import isInteractiveEnv, calcCpuTime, calcWallTime
33from PyJobTransforms.trfReports import trfJobReport, defaultFileReport
34from PyJobTransforms.trfExe import transformExecutor, athenaExecutor
35from PyJobTransforms.trfGraph import executorGraph
36
37
40class transform(object):
41
42
47 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug('Welcome to ATLAS job transforms')
51
52 ## @brief Get transform starting timestamp as early as possible
53 self._transformStart = os.times()
54 msg.debug('transformStart time is {0}'.format(self._transformStart))
55
56 self._inFileValidationStart = None
57 self._inFileValidationStop = None
58 self._outFileValidationStart = None
59 self._outFileValidationStop = None
60
61 ## @brief Get trf pre-data as early as possible
62 self._trfPredata = os.environ.get('TRF_PREDATA')
63
64 ## Transform _name
65 self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
66
67 ## @note Holder for arguments this trf understands
68 # Use @c argparse.SUPPRESS to have non-given arguments unset, rather than None
69 # Support reading arguments from a file using the notation @c @file
70 self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars='@')
73
74 if standardTrfArgs:
75 addStandardTrfArgs(self.parser)
76
77 if standardValidationArgs:
78 addValidationArguments(self.parser)
79 addFileValidationArguments(self.parser)
80
81
82 ## Argument dictionary for this transform
83 self._argdict = dict()
84
85 ## Dsta dictionary place holder (this maps data types to their argFile instances)
86 self._dataDictionary = dict()
87
88
89 # Transform executor list - initalise with an empty set
90 self._executors = set()
91 self._executorDictionary = {}
92
93 # Append the given executors or a default one to the set:
94 if executor is not None:
95 self.appendToExecutorSet(executor or {transformExecutor()})
96
97 ## Transform exit code/message holders
98 self._exitCode = None
99 self._exitMsg = None
100
101 ## Report object for this transform
102 self._report = trfJobReport(parentTrf = self)
103
104 ## Transform processed events
105 self._processedEvents = None
106
107 # Setup standard signal handling if asked
108 if standardSignalHandlers:
109 setTrfSignalHandlers(self._exitWithReport)
110 msg.debug('Standard signal handlers established')
111
112
113 @property
114 def name(self):
115 return self._name
116
117 @property
118 def exitCode(self):
119 if self._exitCode is None:
120 msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode('TRF_UNKNOWN')
122 else:
123 return self._exitCode
124
125 @property
126 def exitMsg(self):
127 if self._exitMsg is None:
128 msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
129 return ''
130 else:
131 return self._exitMsg
132
133 @property
134 def argdict(self):
135 return self._argdict
136
137 @property
138 def dataDictionary(self):
139 return self._dataDictionary
140
141 @property
142 def report(self):
143 return self._report
144
145 @property
146 def transformStart(self):
147 return self._transformStart
148
149 @property
150 def transformSetupCpuTime(self):
151 transformSetupCpuTime = None
152 if self._transformStart and self._inFileValidationStart:
153 transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
154
155 return transformSetupCpuTime
156
157 @property
158 def transformSetupWallTime(self):
159 transformSetupWallTime = None
160 if self._transformStart and self._inFileValidationStart:
161 transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
162
163 return transformSetupWallTime
164
165 @property
166 def inFileValidationCpuTime(self):
167 inFileValidationCpuTime = None
168 if self._inFileValidationStart and self._inFileValidationStop:
169 inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170
171 return inFileValidationCpuTime
172
173 @property
174 def inFileValidationWallTime(self):
175 inFileValidationWallTime = None
176 if self._inFileValidationStart and self._inFileValidationStop:
177 inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178
179 return inFileValidationWallTime
180
181 @property
182 def outFileValidationCpuTime(self):
183 outFileValidationCpuTime = None
184 if self._outFileValidationStart and self._outFileValidationStop:
185 outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186
187 return outFileValidationCpuTime
188
189 @property
190 def outFileValidationWallTime(self):
191 outFileValidationWallTime = None
192 if self._outFileValidationStart and self._outFileValidationStop:
193 outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194
195 return outFileValidationWallTime
196
197 @property
198 def outFileValidationStop(self):
199 return self._outFileValidationStop
200
201 @property
202 def trfPredata(self):
203 return self._trfPredata
204
205 @property
206 def executors(self):
207 return self._executors
208
209 @property
210 def processedEvents(self):
211 return self._processedEvents
212
213 def getProcessedEvents(self):
214 nEvts = None
215 for executionStep in self._executorPath:
216 executor = self._executorDictionary[executionStep['name']]
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
219 return nEvts
220
221 def appendToExecutorSet(self, executors):
222 # Normalise to something iterable
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
226 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
228
229 # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230 # Executor book keeping: set parent link back to me for all executors
231 # Also setup a dictionary, indexed by executor name and check that name is unique
232 ## Setting conf here not working - too early to get the dataDictionary
233 for executor in executors:
234 executor.trf = self
235 if executor.name in self._executorDictionary:
236 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.format(executor.name))
239 self._executors.add(executor)
240 self._executorDictionary[executor.name] = executor
241
242
243 ## @brief Parse command line arguments for a transform
244 def parseCmdLineArgs(self, args):
245 msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
246
247 try:
248 # Use the argparse infrastructure to get the actual command line arguments
249 self._argdict=vars(self.parser.parse_args(args))
250
251 # Need to know if any input or output files were set - if so then we suppress the
252 # corresponding parameters from AMI
253 inputFiles = outputFiles = False
254 for k, v in self._argdict.items():
255 if k.startswith('input') and isinstance(v, argFile):
256 inputFiles = True
257 elif k.startswith('output') and isinstance(v, argFile):
258 outputFiles = True
259 msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
260
261 # Now look for special arguments, which expand out to other parameters
262 # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263 # (However, we defend the real command line against updates from either source)
264 extraParameters = {}
265 # AMI configuration?
266 if 'AMIConfig' in self._argdict:
267 msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268 from PyJobTransforms.trfAMI import TagInfo
269 tag=TagInfo(self._argdict['AMIConfig'].value)
270 updateDict = {}
271 for k, v in dict(tag.trfs[0]).items():
272 # Convert to correct internal key form
273 k = cliToKey(k)
274 if inputFiles and k.startswith('input'):
275 msg.debug('Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.format(k))
277 continue
278 if outputFiles and k.startswith('output'):
279 msg.debug('Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.format(k))
281 continue
282 updateDict[k] = v
283 extraParameters.update(updateDict)
284
285 # JSON arguments?
286 if 'argJSON' in self._argdict:
287 try:
288 import json
289 msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290 argfile = open(self._argdict['argJSON'], 'r')
291 jsonParams = json.load(argfile)
292 msg.debug('Read: {0}'.format(jsonParams))
293 extraParameters.update(convertToStr(jsonParams))
294 argfile.close()
295 except Exception as e:
296 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
297
298 # Event Service
299 if 'eventService' in self._argdict and self._argdict['eventService'].value:
300 updateDict = {}
301 updateDict['athenaMPMergeTargetSize'] = '*:0'
302 updateDict['checkEventCount'] = False
303 updateDict['outputFileValidation'] = False
304 extraParameters.update(updateDict)
305
306 # Process anything we found
307 # List of command line arguments
308 argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309 for k,v in extraParameters.items():
310 msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311 if k not in self.parser._argClass and k not in self.parser._argAlias:
312 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313 # Check if it is an alias
314 if k in self.parser._argAlias:
315 msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316 k = self.parser._argAlias[k]
317 # Check if argument has already been set on the command line
318 if k in argsList:
319 msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
320 continue
321 # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322 if '__call__' in dir(self.parser._argClass[k]):
323 self._argdict[k] = self.parser._argClass[k](v)
324 else:
325 self._argdict[k] = v
326 msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
327
328 # Set the key name as an argument property - useful to be able to look bask at where this
329 # argument came from
330 for k, v in self._argdict.items():
331 if isinstance(v, argument):
332 v.name = k
333 elif isinstance(v, list):
334 for it in v:
335 if isinstance(it, argument):
336 it.name = k
337
338 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339 if 'dumpPickle' in self._argdict:
340 msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341 pickledDump(self._argdict)
342 sys.exit(0)
343
344 # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345 if 'dumpJSON' in self._argdict:
346 msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347 JSONDump(self._argdict)
348 sys.exit(0)
349
350 except trfExceptions.TransformArgException as e:
351 msg.critical('Argument parsing failure: {0!s}'.format(e))
352 self._exitCode = e.errCode
353 self._exitMsg = e.errMsg
354 self._report.fast = True
355 self.generateReport()
356 sys.exit(self._exitCode)
357
358 except trfExceptions.TransformAMIException as e:
359 msg.critical('AMI failure: {0!s}'.format(e))
360 self._exitCode = e.errCode
361 self._exitMsg = e.errMsg
362 sys.exit(self._exitCode)
363
364 self.setGlobalLogLevel()
365
366
367 ## @brief Check transform argument dictionary and set the correct root logger option
368 def setGlobalLogLevel(self):
369 if 'verbose' in self._argdict:
370 setRootLoggerLevel(stdLogLevels['DEBUG'])
371 elif 'loglevel' in self._argdict:
372 if self._argdict['loglevel'] in stdLogLevels:
373 msg.info("Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375 setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
376 else:
377 msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
378
379
380 ## @brief Execute transform
381 # @details This function calls the actual transform execution class and
382 # sets \c self.exitCode, \c self.exitMsg and \c self.processedEvents transform data members.
383 # @return None.
384 def execute(self):
385 msg.debug('Entering transform execution phase')
386
387 #Warn if a CA-based tranform has deprecated command line args
388 #Print warning once per deprecated arg
389 deprecationWarningPrinted = False
390 for exe in self._executors:
391 if isinstance(exe, athenaExecutor):
392 if (not deprecationWarningPrinted) and exe.skeletonCA:
393 toBeRemoved = ["autoConfiguration","trigStream","topOptions","valid"]
394 for deprecatedArg in toBeRemoved:
395 if deprecatedArg in self._argdict:
396 msg.warning("!!!Detected use of "+deprecatedArg+" in command line arguments for CA-based transform!!!")
397 msg.warning(deprecatedArg+" is DEPRECATED and due for removal. Applying it will not do anything, please remove it from your Transform definition.")
398 deprecationWarningPrinted = True
399 msg.debug(deprecatedArg+" detected in executor "+exe.name)
400
401 try:
402 # Intercept a few special options here
403 if 'dumpargs' in self._argdict:
404 self.parser.dumpArgs()
405 sys.exit(0)
406
407 # Graph stuff!
408 msg.info('Resolving execution graph')
409 self._setupGraph()
410
411 if 'showSteps' in self._argdict:
412 for exe in self._executors:
413 print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
414 if msg.level <= logging.DEBUG:
415 print(" {0} -> {1}".format(exe.inData, exe.outData))
416 sys.exit(0)
417
418 if 'showGraph' in self._argdict:
419 print(self._executorGraph)
420 sys.exit(0)
421
422 # Graph stuff!
423 msg.info('Starting to trace execution path')
424 self._tracePath()
425 msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
426 ' '.join([exe['name'] for exe in self._executorPath])))
427
428 if 'showPath' in self._argdict:
429 msg.debug('Execution path list is: {0}'.format(self._executorPath))
430 # Now print it nice
431 print('Executor path is:')
432 for node in self._executorPath:
433 print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
434 sys.exit(0)
435
436 msg.debug('Execution path is {0}'.format(self._executorPath))
437
438 # Prepare files for execution (separate method?)
439 for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
440 if dataType in self._dataDictionary:
441 msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
442 else:
443 fileName = 'tmp.' + dataType
444 # How to pick the correct argFile class?
445 for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
446 stdArgName = prefix + dataType + suffix
447 if stdArgName in self.parser._argClass:
448 msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
449 self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
450 self._dataDictionary[dataType].io = 'temporary'
451 break
452 if dataType not in self._dataDictionary:
453 if 'HIST' in fileName:
454 self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
455
456 else:
457 self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
458 msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
459 self._dataDictionary[dataType].name = fileName
460
461 # Do splitting if required
462 self.setupSplitting()
463
464 # Error if more than one executor in MPI mode
465 if 'mpi' in self._argdict:
466 if len(self._executorPath) > 1:
467 msg.error("MPI mode is not supported for jobs with more than one execution step!")
468 msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
469 sys.exit(1)
470
471 # Now we can set the final executor configuration properly, with the final dataDictionary
472 for executor in self._executors:
473 executor.conf.setFromTransform(self)
474
475 self.validateInFiles()
476
477 for executionStep in self._executorPath:
478 msg.debug('Now preparing to execute {0}'.format(executionStep))
479 executor = self._executorDictionary[executionStep['name']]
480 executor.preExecute(input = executionStep['input'], output = executionStep['output'])
481 try:
482 executor.execute()
483 executor.postExecute()
484 finally:
485 # Swap out the output files for the version with [] lists expanded
486 if 'mpi' in self._argdict:
487 new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
488 self._dataDictionary = new_data_dict
489 executor.conf._dataDictionary = new_data_dict
490 executor.validate()
491
492 self._processedEvents = self.getProcessedEvents()
493 self.validateOutFiles()
494
495 msg.debug('Transform executor succeeded')
496 self._exitCode = 0
497 self._exitMsg = trfExit.codeToName(self._exitCode)
498
499 except trfExceptions.TransformNeedCheckException as e:
500 msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
501 self._exitCode = e.errCode
502 self._exitMsg = e.errMsg
503 self.generateReport(fast=False)
504
505 except trfExceptions.TransformException as e:
506 msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
507 self._exitCode = e.errCode
508 self._exitMsg = e.errMsg
509 # Try and write a job report...
510 self.generateReport(fast=True)
511
512 finally:
513 # Clean up any orphaned processes and exit here if things went bad
514 infanticide(message=True)
515 if self._exitCode:
516 msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
517 sys.exit(self._exitCode)
518
519 ## @brief Setup the executor graph
520 # @note This function might need to be called again when the number of 'substeps' is unknown
521 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
522 # steps they need to run until after digitisation.
523 def _setupGraph(self):
524 # Get input/output data
525 self._inputData = list()
526 self._outputData = list()
527
528 for key, value in self._argdict.items():
529 # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
530 m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
531 # N.B. Protect against taking argunents which are not type argFile
532 if m:
533 if isinstance(value, argFile):
534 if m.group(1) == 'input':
535 self._inputData.append(m.group(2))
536 else:
537 self._outputData.append(m.group(2))
538 self._dataDictionary[m.group(2)] = value
539 elif isinstance(value, list) and value and isinstance(value[0], argFile):
540 if m.group(1) == 'input':
541 self._inputData.append(m.group(2))
542 else:
543 self._outputData.append(m.group(2))
544 self._dataDictionary[m.group(2)] = value
545
546 ## @note If we have no real data then add the pseudo datatype NULL, which allows us to manage
547 # transforms which can run without data
548 if len(self._inputData) == 0:
549 self._inputData.append('inNULL')
550 if len(self._outputData) == 0:
551 self._outputData.append('outNULL')
552 msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
553
554 # Now see if we have any steering - manipulate the substep inputs and outputs before we
555 # setup the graph
556 if 'steering' in self._argdict:
557 msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
558 self._doSteering()
559
560 # Setup the graph and topo sort it
561 self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
562 self._executorGraph.doToposort()
563
564 ## @brief Setup executor splitting
565 def setupSplitting(self):
566 if 'splitConfig' not in self._argdict:
567 return
568
569 split = []
570 for executionStep in self._executorPath:
571 baseStepName = executionStep['name']
572 if baseStepName in split:
573 continue
574
575 baseExecutor = self._executorDictionary[baseStepName]
576 splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
577 if splitting <= 1:
578 continue
579
580 msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
581 index = self._executorPath.index(executionStep)
582 baseStep = self._executorPath.pop(index)
583 for i in range(splitting):
584 name = baseStepName + executorStepSuffix + str(i)
585 step = copy.deepcopy(baseStep)
586 step['name'] = name
587 self._executorPath.insert(index + i, step)
588 executor = copy.deepcopy(baseExecutor)
589 executor.name = name
590 executor.conf.executorStep = i
591 executor.conf.totalExecutorSteps = splitting
592 self._executors.add(executor)
593 self._executorDictionary[name] = executor
594 split.append(name)
595
596 ## @brief Trace the path through the executor graph
597 # @note This function might need to be called again when the number of 'substeps' is unknown
598 # just based on the input data types - e.g., DigiMReco jobs don't know how many RDOtoESD
599 # steps they need to run until after digitisation.
600 def _tracePath(self):
601 self._executorGraph.findExecutionPath()
602
603 self._executorPath = self._executorGraph.execution
604 if len(self._executorPath) == 0:
605 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
606 'Execution path finding resulted in no substeps being executed'
607 '(Did you correctly specify input data for this transform?)')
608 # Tell the first executor that they are the first
609 self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
610
611 ## @brief Setup steering, which manipulates the graph before we trace the path
612 # for this transform
613 # @param steeringDict Manual steering dictionary (if specified, used instead of the
614 # steering from the @c steering argument - pay attention to the input structure!
615 def _doSteering(self, steeringDict = None):
616 if not steeringDict:
617 steeringDict = self._argdict['steering'].value
618 for substep, steeringValues in steeringDict.items():
619 foundSubstep = False
620 for executor in self._executors:
621 if executor.name == substep or executor.substep == substep:
622 foundSubstep = True
623 msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
624 # Steering consists of tuples with (in/out, +/-, datatype)
625 for steeringValue in steeringValues:
626 if steeringValue[0] == 'in':
627 startSet = executor.inData
628 else:
629 startSet = executor.outData
630 origLen = len(startSet)
631 msg.debug('Data values to be modified are: {0}'.format(startSet))
632 if steeringValue[1] == '+':
633 startSet.add(steeringValue[2])
634 if len(startSet) != origLen + 1:
635 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
636 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
637 else:
638 startSet.discard(steeringValue[2])
639 if len(startSet) != origLen - 1:
640 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
641 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
642 msg.debug('Updated data values to: {0}'.format(startSet))
643 if not foundSubstep:
644 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
645 'This transform has no executor/substep {0}'.format(substep))
646
647
648 ## @brief Return the last executor which actually executed
649 # @return Last executor which has @c _hasExecuted == @c True, or the very first executor if we didn't even start yet
650 @property
651 def lastExecuted(self):
652 # Just make sure we have the path traced
653 if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
654 return None
655
656 lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
657 for executorStep in self._executorPath[1:]:
658 if self._executorDictionary[executorStep['name']].hasExecuted:
659 lastExecutor = self._executorDictionary[executorStep['name']]
660 return lastExecutor
661
662
663 ## @brief Transform report generator
664 # @param fast If True ensure that no external calls are made for file metadata (this is
665 # used to generate reports in a hurry after a crash or a forced exit)
666 # @param fileReport Dictionary giving the type of report to make for each type of file.
667 # This dictionary has to have all io types as keys and valid values are:
668 # @c None - skip this io type; @c 'full' - Provide all details; @c 'name' - only dataset and
669 # filename will be reported on.
670 # @param reportType Iterable with report types to generate, otherwise a sensible default
671 # is used (~everything, plus the Tier0 report at Tier0)
672 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
673 msg.debug('Transform report generator')
674 if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
675 msg.debug("Not in rank 0 -- not generating reports")
676 return
677
678 if 'reportType' in self._argdict:
679 if reportType is not None:
680 msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
681 reportType = self._argdict['reportType'].value
682
683 if reportType is None:
684 reportType = ['json', ]
685 # Only generate the Tier0 report at Tier0 ;-)
686 # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
687 if 'TZHOME' in os.environ:
688 reportType.append('gpickle')
689
690 if not isInteractiveEnv():
691 reportType.append('text')
692 msg.debug('Detected Non-Interactive environment. Enabled text report')
693
694 if 'reportName' in self._argdict:
695 baseName = classicName = self._argdict['reportName'].value
696 else:
697 baseName = 'jobReport'
698 classicName = 'metadata'
699
700 try:
701 # Text: Writes environment variables and machine report in text format.
702 if reportType is None or 'text' in reportType:
703 envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
704 self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
705 # JSON
706 if reportType is None or 'json' in reportType:
707 self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
708 # Classic XML
709 if reportType is None or 'classic' in reportType:
710 self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
711 # Classic gPickle
712 if reportType is None or 'gpickle' in reportType:
713 self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
714 # Pickled version of the JSON report for pilot
715 if reportType is None or 'pilotPickle' in reportType:
716 self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
717
718 except trfExceptions.TransformTimeoutException as reportException:
719 msg.error('Received timeout when writing report ({0})'.format(reportException))
720 msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
721 if ('orphanKiller' in self._argdict):
722 infanticide(message=True, listOrphans=True)
723 else:
724 infanticide(message=True)
725 sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
726
727 except trfExceptions.TransformException as reportException:
728 # This is a bad one!
729 msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
730 msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
731 msg.critical('Job reports are likely to be missing or incomplete - sorry')
732 msg.critical('Please report this as a transforms bug!')
733 msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
734 msg.critical('Now exiting with a transform internal error code')
735 if ('orphanKiller' in self._argdict):
736 infanticide(message=True, listOrphans=True)
737 else:
738 infanticide(message=True)
739 sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
740
741
742 # Description stolen from old trfs...
743 ## @brief Common signal handler.
744 # @details This function is installed in place of the default signal handler and attempts to terminate the
745 # transform gracefully. When a signal is caught by the transform, the stdout from the running application process
746 # (i.e. @c athena.py) is allowed to continue uninterrupted and write it's stdout to the log file (to retrieve
747 # the traceback) before the associated job report records the fact that a signal has been caught and complete
748 # the report accordingly.
749 # @param signum Signal number. Not used since this is a common handle assigned to predefined signals using the
750 # @c _installSignalHandlers(). This param is still required to satisfy the requirements of @c signal.signal().
751 # @param frame Not used. Provided here to satisfy the requirements of @c signal.signal().
752 # @return Does not return. Raises SystemExit exception.
753 # @exception SystemExit()
754 def _exitWithReport(self, signum, frame):
755 msg.critical('Transform received signal {0}'.format(signum))
756 msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
757 self._exitCode = 128+signum
758 self._exitMsg = 'Transform received signal {0}'.format(signum)
759
760 # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
761 resetTrfSignalHandlers()
762
763 msg.critical('Attempting to write reports with known information...')
764 self.generateReport(fast=True)
765 if ('orphanKiller' in self._argdict):
766 infanticide(message=True, listOrphans=True)
767 else:
768 infanticide(message=True)
769
770 sys.exit(self._exitCode)
771
772 ## @brief Setter for transform's validation dictionary
773 # @details This function updates the validation dictionary for the transform,
774 # updating values which are passed in the \c newValidationOptions argument.
775 # @param newValidationOptions Dictionary (or tuples) to update validation
776 # dictionary with
777 # @return None
778 def updateValidationDict(self, newValidationOptions):
779 self.validation.update(newValidationOptions)
780
781 ## @brief Getter function for transform validation dictionary
782 # @return Validiation dictionary
783 def getValidationDict(self):
784 return self.validation
785
786 ## @brief Getter for a specific validation option
787 # @param key Validation dictionary key
788 # @return Valdiation key value or @c None if this key is absent
789 def getValidationOption(self, key):
790 if key in self.validation:
791 return self.validation[key]
792 else:
793 return None
794
795 ## @brief Return a list of fileArgs used by the transform
796 # @param \c io Filter files by io attribute
797 # @return List of argFile instances
798 def getFiles(self, io = None):
799 res = []
800 msg.debug('Looking for file arguments matching: io={0}'.format(io))
801 for argName, arg in self._argdict.items():
802 if isinstance(arg, argFile):
803 msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
804 if io is not None and arg.io != io:
805 continue
806 msg.debug('Argument {0} matches criteria'.format(argName))
807 res.append(arg)
808 return res
809
810
811 def validateInFiles(self):
812 if self._inFileValidationStart is None:
813 self._inFileValidationStart = os.times()
814 msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
815
816 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
817 ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
818 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
819 ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
820 ):
821 msg.info('Standard input file validation turned off for transform %s.', self.name)
822 else:
823 msg.info('Validating input files')
824 if 'parallelFileValidation' in self._argdict:
825 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
826 else:
827 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
828
829 self._inFileValidationStop = os.times()
830 msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
831
832 def validateOutFiles(self):
833 if self._outFileValidationStart is None:
834 self._outFileValidationStart = os.times()
835 msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
836
837 if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
838 ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
839 ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
840 ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
841 ):
842 msg.info('Standard output file validation turned off for transform %s.', self.name)
843 elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
844 msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
845 else:
846 msg.info('Validating output files')
847 parparallelMode = False
848 # Make MT file validation default
849 parmultithreadedMode = True
850 if 'parallelFileValidation' in self._argdict:
851 parparallelMode = self._argdict['parallelFileValidation'].value
852 if 'multithreadedFileValidation' in self._argdict:
853 parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
854 trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
855
856 self._outFileValidationStop = os.times()
857 msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))
Core transform class.
Definition transform.py:40
__init__(self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
Initialise a job transform.
Definition transform.py:48
Transform argument class definitions.
Transform execution functions.
Module for transform exit codes.
Transform graph utilities.
Logging configuration for ATLAS job transforms.
Utilities for handling MPI-Athena jobs.
Transform report classes and helper functions.
Signal handling utilities for ATLAS job transforms.
Transform utility functions.
Validation control for job transforms.