ATLAS Offline Software
transform.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 
9 
10 import argparse
11 import copy
12 import os
13 import os.path as path
14 import re
15 import sys
16 import traceback
17 
18 import logging
19 msg = logging.getLogger(__name__)
20 
21 import PyJobTransforms.trfValidation as trfValidation
22 import PyJobTransforms.trfExceptions as trfExceptions
23 import PyJobTransforms.trfMPITools as trfMPITools
24 
25 from PyJobTransforms.trfSignal import setTrfSignalHandlers, resetTrfSignalHandlers
26 from PyJobTransforms.trfArgs import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
27 from PyJobTransforms.trfLogger import setRootLoggerLevel, stdLogLevels
28 from PyJobTransforms.trfArgClasses import trfArgParser, argFile, argHISTFile, argument
29 from PyJobTransforms.trfExeStepTools import executorStepSuffix, getTotalExecutorSteps
30 from PyJobTransforms.trfExitCodes import trfExit
31 from PyJobTransforms.trfUtils import shQuoteStrings, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
32 from PyJobTransforms.trfUtils import isInteractiveEnv, calcCpuTime, calcWallTime
33 from PyJobTransforms.trfReports import trfJobReport, defaultFileReport
34 from PyJobTransforms.trfExe import transformExecutor
35 from PyJobTransforms.trfGraph import executorGraph
36 
37 
41 
42 
47  def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48  trfName = None, executor = None, exeArgs = None, description = ''):
49  '''Transform class initialiser'''
50  msg.debug('Welcome to ATLAS job transforms')
51 
52 
53  self._transformStart = os.times()
54  msg.debug('transformStart time is {0}'.format(self._transformStart))
55 
59  self._outFileValidationStop = None
60 
61 
62  self._trfPredata = os.environ.get('TRF_PREDATA')
63 
64 
65  self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
66 
67 
70  self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
71  argument_default=argparse.SUPPRESS,
72  fromfile_prefix_chars='@')
73 
74  if standardTrfArgs:
76 
77  if standardValidationArgs:
80 
81 
82 
83  self._argdict = dict()
84 
85 
86  self._dataDictionary = dict()
87 
88 
89  # Transform executor list - initalise with an empty set
90  self._executors = set()
91  self._executorDictionary = {}
92 
93  # Append the given executors or a default one to the set:
94  if executor is not None:
95  self.appendToExecutorSet(executor or {transformExecutor()})
96 
97 
98  self._exitCode = None
99  self._exitMsg = None
100 
101 
102  self._report = trfJobReport(parentTrf = self)
103 
104 
105  self._processedEvents = None
106 
107  # Setup standard signal handling if asked
108  if standardSignalHandlers:
110  msg.debug('Standard signal handlers established')
111 
112 
113  @property
114  def name(self):
115  return self._name
116 
117  @property
118  def exitCode(self):
119  if self._exitCode is None:
120  msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121  return trfExit.nameToCode('TRF_UNKNOWN')
122  else:
123  return self._exitCode
124 
125  @property
126  def exitMsg(self):
127  if self._exitMsg is None:
128  msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
129  return ''
130  else:
131  return self._exitMsg
132 
133  @property
134  def argdict(self):
135  return self._argdict
136 
137  @property
138  def dataDictionary(self):
139  return self._dataDictionary
140 
141  @property
142  def report(self):
143  return self._report
144 
145  @property
146  def transformStart(self):
147  return self._transformStart
148 
149  @property
151  transformSetupCpuTime = None
152  if self._transformStart and self._inFileValidationStart:
153  transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
154 
155  return transformSetupCpuTime
156 
157  @property
159  transformSetupWallTime = None
160  if self._transformStart and self._inFileValidationStart:
161  transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
162 
163  return transformSetupWallTime
164 
165  @property
167  inFileValidationCpuTime = None
169  inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
170 
171  return inFileValidationCpuTime
172 
173  @property
175  inFileValidationWallTime = None
177  inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
178 
179  return inFileValidationWallTime
180 
181  @property
183  outFileValidationCpuTime = None
185  outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
186 
187  return outFileValidationCpuTime
188 
189  @property
191  outFileValidationWallTime = None
193  outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
194 
195  return outFileValidationWallTime
196 
197  @property
199  return self._outFileValidationStop
200 
201  @property
202  def trfPredata(self):
203  return self._trfPredata
204 
205  @property
206  def executors(self):
207  return self._executors
208 
209  @property
210  def processedEvents(self):
211  return self._processedEvents
212 
214  nEvts = None
215  for executionStep in self._executorPath:
216  executor = self._executorDictionary[executionStep['name']]
217  if executor.conf.firstExecutor:
218  nEvts = executor.eventCount
219  return nEvts
220 
221  def appendToExecutorSet(self, executors):
222  # Normalise to something iterable
223  if isinstance(executors, transformExecutor):
224  executors = [executors,]
225  elif not isinstance(executors, (list, tuple, set)):
226  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
227  'Transform was initialised with an executor which was not a simple executor or an executor set')
228 
229  # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
230  # Executor book keeping: set parent link back to me for all executors
231  # Also setup a dictionary, indexed by executor name and check that name is unique
232 
233  for executor in executors:
234  executor.trf = self
235  if executor.name in self._executorDictionary:
236  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
237  'Transform has been initialised with two executors with the same name ({0})'
238  ' - executor names must be unique'.format(executor.name))
239  self._executors.add(executor)
240  self._executorDictionary[executor.name] = executor
241 
242 
243 
244  def parseCmdLineArgs(self, args):
245  msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
246 
247  try:
248  # Use the argparse infrastructure to get the actual command line arguments
249  self._argdict=vars(self.parser.parse_args(args))
250 
251  # Need to know if any input or output files were set - if so then we suppress the
252  # corresponding parameters from AMI
253  inputFiles = outputFiles = False
254  for k, v in self._argdict.items():
255  if k.startswith('input') and isinstance(v, argFile):
256  inputFiles = True
257  elif k.startswith('output') and isinstance(v, argFile):
258  outputFiles = True
259  msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
260 
261  # Now look for special arguments, which expand out to other parameters
262  # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
263  # (However, we defend the real command line against updates from either source)
264  extraParameters = {}
265  # AMI configuration?
266  if 'AMIConfig' in self._argdict:
267  msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
268  from PyJobTransforms.trfAMI import TagInfo
269  tag=TagInfo(self._argdict['AMIConfig'].value)
270  updateDict = {}
271  for k, v in dict(tag.trfs[0]).items():
272  # Convert to correct internal key form
273  k = cliToKey(k)
274  if inputFiles and k.startswith('input'):
275  msg.debug('Suppressing argument {0} from AMI'
276  ' because input files have been specified on the command line'.format(k))
277  continue
278  if outputFiles and k.startswith('output'):
279  msg.debug('Suppressing argument {0} from AMI'
280  ' because output files have been specified on the command line'.format(k))
281  continue
282  updateDict[k] = v
283  extraParameters.update(updateDict)
284 
285  # JSON arguments?
286  if 'argJSON' in self._argdict:
287  try:
288  import json
289  msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
290  argfile = open(self._argdict['argJSON'], 'r')
291  jsonParams = json.load(argfile)
292  msg.debug('Read: {0}'.format(jsonParams))
293  extraParameters.update(convertToStr(jsonParams))
294  argfile.close()
295  except Exception as e:
296  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
297 
298  # Event Service
299  if 'eventService' in self._argdict and self._argdict['eventService'].value:
300  updateDict = {}
301  updateDict['athenaMPMergeTargetSize'] = '*:0'
302  updateDict['checkEventCount'] = False
303  updateDict['outputFileValidation'] = False
304  extraParameters.update(updateDict)
305 
306  # Process anything we found
307  # List of command line arguments
308  argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
309  for k,v in extraParameters.items():
310  msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
311  if k not in self.parser._argClass and k not in self.parser._argAlias:
312  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
313  # Check if it is an alias
314  if k in self.parser._argAlias:
315  msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
316  k = self.parser._argAlias[k]
317  # Check if argument has already been set on the command line
318  if k in argsList:
319  msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
320  continue
321  # For callable classes we instantiate properly, otherwise we set the value for simple arguments
322  if '__call__' in dir(self.parser._argClass[k]):
323  self._argdict[k] = self.parser._argClass[k](v)
324  else:
325  self._argdict[k] = v
326  msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
327 
328  # Set the key name as an argument property - useful to be able to look bask at where this
329  # argument came from
330  for k, v in self._argdict.items():
331  if isinstance(v, argument):
332  v.name = k
333  elif isinstance(v, list):
334  for it in v:
335  if isinstance(it, argument):
336  it.name = k
337 
338  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
339  if 'dumpPickle' in self._argdict:
340  msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
341  pickledDump(self._argdict)
342  sys.exit(0)
343 
344  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
345  if 'dumpJSON' in self._argdict:
346  msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
347  JSONDump(self._argdict)
348  sys.exit(0)
349 
351  msg.critical('Argument parsing failure: {0!s}'.format(e))
352  self._exitCode = e.errCode
353  self._exitMsg = e.errMsg
354  self._report.fast = True
355  self.generateReport()
356  sys.exit(self._exitCode)
357 
359  msg.critical('AMI failure: {0!s}'.format(e))
360  self._exitCode = e.errCode
361  self._exitMsg = e.errMsg
362  sys.exit(self._exitCode)
363 
364  self.setGlobalLogLevel()
365 
366 
367 
368  def setGlobalLogLevel(self):
369  if 'verbose' in self._argdict:
370  setRootLoggerLevel(stdLogLevels['DEBUG'])
371  elif 'loglevel' in self._argdict:
372  if self._argdict['loglevel'] in stdLogLevels:
373  msg.info("Loglevel option found - setting root logger level to %s",
374  logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
375  setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
376  else:
377  msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
378 
379 
380 
384  def execute(self):
385  msg.debug('Entering transform execution phase')
386 
387  try:
388  # Intercept a few special options here
389  if 'dumpargs' in self._argdict:
390  self.parser.dumpArgs()
391  sys.exit(0)
392 
393  # Graph stuff!
394  msg.info('Resolving execution graph')
395  self._setupGraph()
396 
397  if 'showSteps' in self._argdict:
398  for exe in self._executors:
399  print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
400  if msg.level <= logging.DEBUG:
401  print(" {0} -> {1}".format(exe.inData, exe.outData))
402  sys.exit(0)
403 
404  if 'showGraph' in self._argdict:
405  print(self._executorGraph)
406  sys.exit(0)
407 
408  # Graph stuff!
409  msg.info('Starting to trace execution path')
410  self._tracePath()
411  msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
412  ' '.join([exe['name'] for exe in self._executorPath])))
413 
414  if 'showPath' in self._argdict:
415  msg.debug('Execution path list is: {0}'.format(self._executorPath))
416  # Now print it nice
417  print('Executor path is:')
418  for node in self._executorPath:
419  print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
420  sys.exit(0)
421 
422  msg.debug('Execution path is {0}'.format(self._executorPath))
423 
424  # Prepare files for execution (separate method?)
425  for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
426  if dataType in self._dataDictionary:
427  msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
428  else:
429  fileName = 'tmp.' + dataType
430  # How to pick the correct argFile class?
431  for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
432  stdArgName = prefix + dataType + suffix
433  if stdArgName in self.parser._argClass:
434  msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
435  self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
436  self._dataDictionary[dataType].io = 'temporary'
437  break
438  if dataType not in self._dataDictionary:
439  if 'HIST' in fileName:
440  self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
441 
442  else:
443  self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
444  msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
445  self._dataDictionary[dataType].name = fileName
446 
447  # Do splitting if required
448  self.setupSplitting()
449 
450  # Error if more than one executor in MPI mode
451  if 'mpi' in self._argdict:
452  if len(self._executorPath) > 1:
453  msg.error("MPI mode is not supported for jobs with more than one execution step!")
454  msg.error(f"We have {len(self._executorPath)}: {self._executorPath}")
455  sys.exit(1)
456 
457  # Now we can set the final executor configuration properly, with the final dataDictionary
458  for executor in self._executors:
459  executor.conf.setFromTransform(self)
460 
461  self.validateInFiles()
462 
463  for executionStep in self._executorPath:
464  msg.debug('Now preparing to execute {0}'.format(executionStep))
465  executor = self._executorDictionary[executionStep['name']]
466  executor.preExecute(input = executionStep['input'], output = executionStep['output'])
467  try:
468  executor.execute()
469  executor.postExecute()
470  finally:
471  # Swap out the output files for the version with [] lists expanded
472  if 'mpi' in self._argdict:
473  new_data_dict = {**self._dataDictionary, **trfMPITools.mpiConfig["outputs"]}
474  self._dataDictionary = new_data_dict
475  executor.conf._dataDictionary = new_data_dict
476  executor.validate()
477 
479  self.validateOutFiles()
480 
481  msg.debug('Transform executor succeeded')
482  self._exitCode = 0
483  self._exitMsg = trfExit.codeToName(self._exitCode)
484 
486  msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
487  self._exitCode = e.errCode
488  self._exitMsg = e.errMsg
489  self.generateReport(fast=False)
490 
492  msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
493  self._exitCode = e.errCode
494  self._exitMsg = e.errMsg
495  # Try and write a job report...
496  self.generateReport(fast=True)
497 
498  finally:
499  # Clean up any orphaned processes and exit here if things went bad
500  infanticide(message=True)
501  if self._exitCode:
502  msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
503  sys.exit(self._exitCode)
504 
505 
509  def _setupGraph(self):
510  # Get input/output data
511  self._inputData = list()
512  self._outputData = list()
513 
514  for key, value in self._argdict.items():
515  # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
516  m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
517  # N.B. Protect against taking argunents which are not type argFile
518  if m:
519  if isinstance(value, argFile):
520  if m.group(1) == 'input':
521  self._inputData.append(m.group(2))
522  else:
523  self._outputData.append(m.group(2))
524  self._dataDictionary[m.group(2)] = value
525  elif isinstance(value, list) and value and isinstance(value[0], argFile):
526  if m.group(1) == 'input':
527  self._inputData.append(m.group(2))
528  else:
529  self._outputData.append(m.group(2))
530  self._dataDictionary[m.group(2)] = value
531 
532 
534  if len(self._inputData) == 0:
535  self._inputData.append('inNULL')
536  if len(self._outputData) == 0:
537  self._outputData.append('outNULL')
538  msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
539 
540  # Now see if we have any steering - manipulate the substep inputs and outputs before we
541  # setup the graph
542  if 'steering' in self._argdict:
543  msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
544  self._doSteering()
545 
546  # Setup the graph and topo sort it
547  self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
548  self._executorGraph.doToposort()
549 
550 
551  def setupSplitting(self):
552  if 'splitConfig' not in self._argdict:
553  return
554 
555  split = []
556  for executionStep in self._executorPath:
557  baseStepName = executionStep['name']
558  if baseStepName in split:
559  continue
560 
561  baseExecutor = self._executorDictionary[baseStepName]
562  splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
563  if splitting <= 1:
564  continue
565 
566  msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
567  index = self._executorPath.index(executionStep)
568  baseStep = self._executorPath.pop(index)
569  for i in range(splitting):
570  name = baseStepName + executorStepSuffix + str(i)
571  step = copy.deepcopy(baseStep)
572  step['name'] = name
573  self._executorPath.insert(index + i, step)
574  executor = copy.deepcopy(baseExecutor)
575  executor.name = name
576  executor.conf.executorStep = i
577  executor.conf.totalExecutorSteps = splitting
578  self._executors.add(executor)
579  self._executorDictionary[name] = executor
580  split.append(name)
581 
582 
586  def _tracePath(self):
587  self._executorGraph.findExecutionPath()
588 
589  self._executorPath = self._executorGraph.execution
590  if len(self._executorPath) == 0:
591  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
592  'Execution path finding resulted in no substeps being executed'
593  '(Did you correctly specify input data for this transform?)')
594  # Tell the first executor that they are the first
595  self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
596 
597 
601  def _doSteering(self, steeringDict = None):
602  if not steeringDict:
603  steeringDict = self._argdict['steering'].value
604  for substep, steeringValues in steeringDict.items():
605  foundSubstep = False
606  for executor in self._executors:
607  if executor.name == substep or executor.substep == substep:
608  foundSubstep = True
609  msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
610  # Steering consists of tuples with (in/out, +/-, datatype)
611  for steeringValue in steeringValues:
612  if steeringValue[0] == 'in':
613  startSet = executor.inData
614  else:
615  startSet = executor.outData
616  origLen = len(startSet)
617  msg.debug('Data values to be modified are: {0}'.format(startSet))
618  if steeringValue[1] == '+':
619  startSet.add(steeringValue[2])
620  if len(startSet) != origLen + 1:
621  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
622  'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
623  else:
624  startSet.discard(steeringValue[2])
625  if len(startSet) != origLen - 1:
626  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
627  'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
628  msg.debug('Updated data values to: {0}'.format(startSet))
629  if not foundSubstep:
630  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
631  'This transform has no executor/substep {0}'.format(substep))
632 
633 
634 
636  @property
637  def lastExecuted(self):
638  # Just make sure we have the path traced
639  if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
640  return None
641 
642  lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
643  for executorStep in self._executorPath[1:]:
644  if self._executorDictionary[executorStep['name']].hasExecuted:
645  lastExecutor = self._executorDictionary[executorStep['name']]
646  return lastExecutor
647 
648 
649 
658  def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659  msg.debug('Transform report generator')
660  if 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
661  msg.debug("Not in rank 0 -- not generating reports")
662  return
663 
664  if 'reportType' in self._argdict:
665  if reportType is not None:
666  msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
667  reportType = self._argdict['reportType'].value
668 
669  if reportType is None:
670  reportType = ['json', ]
671  # Only generate the Tier0 report at Tier0 ;-)
672  # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
673  if 'TZHOME' in os.environ:
674  reportType.append('gpickle')
675 
676  if not isInteractiveEnv():
677  reportType.append('text')
678  msg.debug('Detected Non-Interactive environment. Enabled text report')
679 
680  if 'reportName' in self._argdict:
681  baseName = classicName = self._argdict['reportName'].value
682  else:
683  baseName = 'jobReport'
684  classicName = 'metadata'
685 
686  try:
687  # Text: Writes environment variables and machine report in text format.
688  if reportType is None or 'text' in reportType:
689  envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
690  self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
691  # JSON
692  if reportType is None or 'json' in reportType:
693  self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
694  # Classic XML
695  if reportType is None or 'classic' in reportType:
696  self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
697  # Classic gPickle
698  if reportType is None or 'gpickle' in reportType:
699  self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
700  # Pickled version of the JSON report for pilot
701  if reportType is None or 'pilotPickle' in reportType:
702  self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
703 
704  except trfExceptions.TransformTimeoutException as reportException:
705  msg.error('Received timeout when writing report ({0})'.format(reportException))
706  msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707  if ('orphanKiller' in self._argdict):
708  infanticide(message=True, listOrphans=True)
709  else:
710  infanticide(message=True)
711  sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
712 
713  except trfExceptions.TransformException as reportException:
714  # This is a bad one!
715  msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
716  msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
717  msg.critical('Job reports are likely to be missing or incomplete - sorry')
718  msg.critical('Please report this as a transforms bug!')
719  msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
720  msg.critical('Now exiting with a transform internal error code')
721  if ('orphanKiller' in self._argdict):
722  infanticide(message=True, listOrphans=True)
723  else:
724  infanticide(message=True)
725  sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
726 
727 
728  # Description stolen from old trfs...
729 
740  def _exitWithReport(self, signum, frame):
741  msg.critical('Transform received signal {0}'.format(signum))
742  msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
743  self._exitCode = 128+signum
744  self._exitMsg = 'Transform received signal {0}'.format(signum)
745 
746  # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
748 
749  msg.critical('Attempting to write reports with known information...')
750  self.generateReport(fast=True)
751  if ('orphanKiller' in self._argdict):
752  infanticide(message=True, listOrphans=True)
753  else:
754  infanticide(message=True)
755 
756  sys.exit(self._exitCode)
757 
758 
764  def updateValidationDict(self, newValidationOptions):
765  self.validation.update(newValidationOptions)
766 
767 
769  def getValidationDict(self):
770  return self.validation
771 
772 
775  def getValidationOption(self, key):
776  if key in self.validation:
777  return self.validation[key]
778  else:
779  return None
780 
781 
784  def getFiles(self, io = None):
785  res = []
786  msg.debug('Looking for file arguments matching: io={0}'.format(io))
787  for argName, arg in self._argdict.items():
788  if isinstance(arg, argFile):
789  msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
790  if io is not None and arg.io != io:
791  continue
792  msg.debug('Argument {0} matches criteria'.format(argName))
793  res.append(arg)
794  return res
795 
796 
797  def validateInFiles(self):
798  if self._inFileValidationStart is None:
799  self._inFileValidationStart = os.times()
800  msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
801 
802  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
803  ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
804  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
805  ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
806  ):
807  msg.info('Standard input file validation turned off for transform %s.', self.name)
808  else:
809  msg.info('Validating input files')
810  if 'parallelFileValidation' in self._argdict:
811  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
812  else:
813  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
814 
815  self._inFileValidationStop = os.times()
816  msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
817 
818  def validateOutFiles(self):
819  if self._outFileValidationStart is None:
820  self._outFileValidationStart = os.times()
821  msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
822 
823  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
824  ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
825  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
826  ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
827  ):
828  msg.info('Standard output file validation turned off for transform %s.', self.name)
829  elif 'mpi' in self.argdict and not trfMPITools.mpiShouldValidate():
830  msg.info("MPI mode and not in rank 0 ∴ not validating partial outputs")
831  else:
832  msg.info('Validating output files')
833  parparallelMode = False
834  # Make MT file validation default
835  parmultithreadedMode = True
836  if 'parallelFileValidation' in self._argdict:
837  parparallelMode = self._argdict['parallelFileValidation'].value
838  if 'multithreadedFileValidation' in self._argdict:
839  parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
840  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
841 
842  self._outFileValidationStop = os.times()
843  msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))
PyJobTransforms.trfSignal
Signal handling utilities for ATLAS job transforms.
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
python.trfExceptions.TransformNeedCheckException
Exception used when the job wants to signal that it should get manual intervention at Tier-0.
Definition: trfExceptions.py:91
PyJobTransforms.trfAMI
Utilities for configuration of transforms via AMI tags.
python.transform.transform.dataDictionary
def dataDictionary(self)
Definition: transform.py:138
python.trfUtils.pickledDump
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
Definition: trfUtils.py:598
python.transform.transform._argdict
_argdict
Argument dictionary for this transform.
Definition: transform.py:82
python.transform.transform._setupGraph
def _setupGraph(self)
Setup the executor graph.
Definition: transform.py:509
python.trfExceptions.TransformSetupException
Setup exceptions.
Definition: trfExceptions.py:42
python.transform.transform.execute
def execute(self)
Execute transform.
Definition: transform.py:384
vtune_athena.format
format
Definition: vtune_athena.py:14
PyJobTransforms.trfGraph
Transform graph utilities.
python.transform.transform.lastExecuted
def lastExecuted(self)
Return the last executor which actually executed.
Definition: transform.py:637
python.trfArgs.addValidationArguments
def addValidationArguments(parser)
Definition: trfArgs.py:504
python.transform.transform._outputData
_outputData
Definition: transform.py:512
python.transform.transform.report
def report(self)
Definition: transform.py:142
python.transform.transform._tracePath
def _tracePath(self)
Trace the path through the executor graph.
Definition: transform.py:586
index
Definition: index.py:1
python.trfSignal.resetTrfSignalHandlers
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
Definition: trfSignal.py:40
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1684
python.trfExceptions.TransformArgException
Group of argument based exceptions.
Definition: trfExceptions.py:38
python.transform.transform.parser
parser
Definition: transform.py:69
PyJobTransforms.trfReports
Transform report classes and helper functions.
python.transform.transform.outFileValidationWallTime
def outFileValidationWallTime(self)
Definition: transform.py:190
python.transform.transform._dataDictionary
_dataDictionary
Dsta dictionary place holder (this maps data types to their argFile instances)
Definition: transform.py:85
python.transform.transform.inFileValidationWallTime
def inFileValidationWallTime(self)
Definition: transform.py:174
python.transform.transform.outFileValidationCpuTime
def outFileValidationCpuTime(self)
Definition: transform.py:182
python.transform.transform
Core transform class.
Definition: transform.py:40
python.transform.transform.getValidationOption
def getValidationOption(self, key)
Getter for a specific validation option.
Definition: transform.py:775
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.transform.transform.exitCode
def exitCode(self)
Definition: transform.py:118
python.trfArgs.addStandardTrfArgs
def addStandardTrfArgs(parser)
Add standard transform arguments to an argparse ArgumentParser.
Definition: trfArgs.py:16
python.transform.transform.setGlobalLogLevel
def setGlobalLogLevel(self)
Check transform argument dictionary and set the correct root logger option.
Definition: transform.py:368
python.transform.transform.inFileValidationCpuTime
def inFileValidationCpuTime(self)
Definition: transform.py:166
PyJobTransforms.trfExitCodes
Module for transform exit codes.
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.transform.transform.transformSetupCpuTime
def transformSetupCpuTime(self)
Definition: transform.py:150
python.transform.transform._outFileValidationStop
_outFileValidationStop
Definition: transform.py:58
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.transform.transform.getValidationDict
def getValidationDict(self)
Getter function for transform validation dictionary.
Definition: transform.py:769
python.transform.transform.generateReport
def generateReport(self, reportType=None, fast=False, fileReport=defaultFileReport)
Transform report generator.
Definition: transform.py:658
python.transform.transform._trfPredata
_trfPredata
Get trf pre-data as early as possible.
Definition: transform.py:61
python.trfLogger.setRootLoggerLevel
def setRootLoggerLevel(level)
Change the loggging level of the root logger.
Definition: trfLogger.py:56
python.transform.transform._processedEvents
_processedEvents
Transform processed events.
Definition: transform.py:104
python.transform.transform._inputData
_inputData
Definition: transform.py:511
python.transform.transform.exitMsg
def exitMsg(self)
Definition: transform.py:126
python.trfUtils.cliToKey
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
Definition: trfUtils.py:652
python.transform.transform.validateOutFiles
def validateOutFiles(self)
Definition: transform.py:818
python.transform.transform.validateInFiles
def validateInFiles(self)
Definition: transform.py:797
TagInfo
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
Definition: TagInfo.h:41
python.transform.transform.outFileValidationStop
def outFileValidationStop(self)
Definition: transform.py:198
python.transform.transform.updateValidationDict
def updateValidationDict(self, newValidationOptions)
Setter for transform's validation dictionary.
Definition: transform.py:764
python.transform.transform._outFileValidationStart
_outFileValidationStart
Definition: transform.py:57
python.transform.transform._inFileValidationStart
_inFileValidationStart
Definition: transform.py:55
python.trfExceptions.TransformAMIException
Exception used by configuration via AMI tags.
Definition: trfExceptions.py:86
python.trfExeStepTools.getTotalExecutorSteps
def getTotalExecutorSteps(executor, argdict=None)
Definition: trfExeStepTools.py:29
python.transform.transform.transformStart
def transformStart(self)
Definition: transform.py:146
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.trfUtils.convertToStr
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
Definition: trfUtils.py:636
python.trfExceptions.TransformInternalException
Transform internal errors.
Definition: trfExceptions.py:74
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.transform.transform._exitWithReport
def _exitWithReport(self, signum, frame)
Common signal handler.
Definition: transform.py:740
PyJobTransforms.trfValidation
Validation control for job transforms.
python.transform.transform.name
def name(self)
Definition: transform.py:114
python.transform.transform.setupSplitting
def setupSplitting(self)
Setup executor splitting.
Definition: transform.py:551
beamspotman.dir
string dir
Definition: beamspotman.py:621
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.trfExceptions.TransformTimeoutException
Exception used by time limited executions.
Definition: trfExceptions.py:78
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
python.transform.transform.parseCmdLineArgs
def parseCmdLineArgs(self, args)
Parse command line arguments for a transform.
Definition: transform.py:244
python.transform.transform.trfPredata
def trfPredata(self)
Definition: transform.py:202
python.transform.transform.getFiles
def getFiles(self, io=None)
Return a list of fileArgs used by the transform.
Definition: transform.py:784
python.transform.transform.transformSetupWallTime
def transformSetupWallTime(self)
Definition: transform.py:158
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.transform.transform._exitMsg
_exitMsg
Definition: transform.py:98
python.transform.transform._doSteering
def _doSteering(self, steeringDict=None)
Setup steering, which manipulates the graph before we trace the path for this transform.
Definition: transform.py:601
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:71
python.transform.transform._executorDictionary
_executorDictionary
Definition: transform.py:90
python.transform.transform.appendToExecutorSet
def appendToExecutorSet(self, executors)
Definition: transform.py:221
python.trfSignal.setTrfSignalHandlers
def setTrfSignalHandlers(handler)
Install common handler for various signals.
Definition: trfSignal.py:28
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1692
python.trfArgs.addFileValidationArguments
def addFileValidationArguments(parser)
Definition: trfArgs.py:484
python.transform.transform._transformStart
_transformStart
Get transform starting timestamp as early as possible.
Definition: transform.py:52
PyJobTransforms.trfExe
Transform execution functions.
python.trfExceptions.TransformException
Base class for transform exceptions.
Definition: trfExceptions.py:14
Trk::open
@ open
Definition: BinningType.h:40
PyJobTransforms.trfMPITools
Utilities for handling MPI-Athena jobs.
python.transform.transform._executors
_executors
Definition: transform.py:89
python.trfUtils.JSONDump
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
Definition: trfUtils.py:617
PyJobTransforms.trfUtils
Transform utility functions.
python.transform.transform._executorGraph
_executorGraph
Definition: transform.py:547
PyJobTransforms.trfLogger
Logging configuration for ATLAS job transforms.
python.transform.transform._executorPath
_executorPath
Definition: transform.py:589
confTool.parse_args
def parse_args()
Definition: confTool.py:36
python.transform.transform.__init__
def __init__(self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
Initialise a job transform.
Definition: transform.py:47
python.transform.transform._inFileValidationStop
_inFileValidationStop
Definition: transform.py:56
pickleTool.object
object
Definition: pickleTool.py:29
python.transform.transform.executors
def executors(self)
Definition: transform.py:206
str
Definition: BTagTrackIpAccessor.cxx:11
python.transform.transform._report
_report
Report object for this transform.
Definition: transform.py:101
python.transform.transform.argdict
def argdict(self)
Definition: transform.py:134
python.trfUtils.shQuoteStrings
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
Definition: trfUtils.py:361
python.transform.transform.getProcessedEvents
def getProcessedEvents(self)
Definition: transform.py:213
python.transform.transform.processedEvents
def processedEvents(self)
Definition: transform.py:210
python.transform.transform._exitCode
_exitCode
Transform exit code/message holders.
Definition: transform.py:97
python.transform.transform._name
_name
Transform _name.
Definition: transform.py:64
python.trfUtils.infanticide
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.
Definition: trfUtils.py:132