ATLAS Offline Software
transform.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 
9 
10 import argparse
11 import copy
12 import os
13 import os.path as path
14 import re
15 import sys
16 import traceback
17 
18 import logging
19 msg = logging.getLogger(__name__)
20 
21 import PyJobTransforms.trfValidation as trfValidation
22 import PyJobTransforms.trfExceptions as trfExceptions
23 
24 from PyJobTransforms.trfSignal import setTrfSignalHandlers, resetTrfSignalHandlers
25 from PyJobTransforms.trfArgs import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
26 from PyJobTransforms.trfLogger import setRootLoggerLevel, stdLogLevels
27 from PyJobTransforms.trfArgClasses import trfArgParser, argFile, argHISTFile, argument
28 from PyJobTransforms.trfExeStepTools import executorStepSuffix, getTotalExecutorSteps
29 from PyJobTransforms.trfExitCodes import trfExit
30 from PyJobTransforms.trfUtils import shQuoteStrings, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
31 from PyJobTransforms.trfUtils import isInteractiveEnv, calcCpuTime, calcWallTime
32 from PyJobTransforms.trfReports import trfJobReport, defaultFileReport
33 from PyJobTransforms.trfExe import transformExecutor
34 from PyJobTransforms.trfGraph import executorGraph
35 
36 
40 
41 
46  def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
47  trfName = None, executor = None, exeArgs = None, description = ''):
48  '''Transform class initialiser'''
49  msg.debug('Welcome to ATLAS job transforms')
50 
51 
52  self._transformStart = os.times()
53  msg.debug('transformStart time is {0}'.format(self._transformStart))
54 
58  self._outFileValidationStop = None
59 
60 
61  self._trfPredata = os.environ.get('TRF_PREDATA')
62 
63 
64  self._name = trfName or path.basename(sys.argv[0]).rsplit('.py', 1)[0]
65 
66 
69  self.parser = trfArgParser(description='Transform {0}. {1}'.format(self.name, description),
70  argument_default=argparse.SUPPRESS,
71  fromfile_prefix_chars='@')
72 
73  if standardTrfArgs:
75 
76  if standardValidationArgs:
79 
80 
81 
82  self._argdict = dict()
83 
84 
85  self._dataDictionary = dict()
86 
87 
88  # Transform executor list - initalise with an empty set
89  self._executors = set()
90  self._executorDictionary = {}
91 
92  # Append the given executors or a default one to the set:
93  if executor is not None:
94  self.appendToExecutorSet(executor or {transformExecutor()})
95 
96 
97  self._exitCode = None
98  self._exitMsg = None
99 
100 
101  self._report = trfJobReport(parentTrf = self)
102 
103 
104  self._processedEvents = None
105 
106  # Setup standard signal handling if asked
107  if standardSignalHandlers:
109  msg.debug('Standard signal handlers established')
110 
111 
112  @property
113  def name(self):
114  return self._name
115 
116  @property
117  def exitCode(self):
118  if self._exitCode is None:
119  msg.warning('Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
120  return trfExit.nameToCode('TRF_UNKNOWN')
121  else:
122  return self._exitCode
123 
124  @property
125  def exitMsg(self):
126  if self._exitMsg is None:
127  msg.warning('Transform exit message getter: _exitMsg is unset, returning empty string')
128  return ''
129  else:
130  return self._exitMsg
131 
132  @property
133  def argdict(self):
134  return self._argdict
135 
136  @property
137  def dataDictionary(self):
138  return self._dataDictionary
139 
140  @property
141  def report(self):
142  return self._report
143 
144  @property
145  def transformStart(self):
146  return self._transformStart
147 
148  @property
150  transformSetupCpuTime = None
151  if self._transformStart and self._inFileValidationStart:
152  transformSetupCpuTime = calcCpuTime(self._transformStart, self._inFileValidationStart)
153 
154  return transformSetupCpuTime
155 
156  @property
158  transformSetupWallTime = None
159  if self._transformStart and self._inFileValidationStart:
160  transformSetupWallTime = calcWallTime(self._transformStart, self._inFileValidationStart)
161 
162  return transformSetupWallTime
163 
164  @property
166  inFileValidationCpuTime = None
168  inFileValidationCpuTime = calcCpuTime(self._inFileValidationStart, self._inFileValidationStop)
169 
170  return inFileValidationCpuTime
171 
172  @property
174  inFileValidationWallTime = None
176  inFileValidationWallTime = calcWallTime(self._inFileValidationStart, self._inFileValidationStop)
177 
178  return inFileValidationWallTime
179 
180  @property
182  outFileValidationCpuTime = None
184  outFileValidationCpuTime = calcCpuTime(self._outFileValidationStart, self._outFileValidationStop)
185 
186  return outFileValidationCpuTime
187 
188  @property
190  outFileValidationWallTime = None
192  outFileValidationWallTime = calcWallTime(self._outFileValidationStart, self._outFileValidationStop)
193 
194  return outFileValidationWallTime
195 
196  @property
198  return self._outFileValidationStop
199 
200  @property
201  def trfPredata(self):
202  return self._trfPredata
203 
204  @property
205  def executors(self):
206  return self._executors
207 
208  @property
209  def processedEvents(self):
210  return self._processedEvents
211 
213  nEvts = None
214  for executionStep in self._executorPath:
215  executor = self._executorDictionary[executionStep['name']]
216  if executor.conf.firstExecutor:
217  nEvts = executor.eventCount
218  return nEvts
219 
220  def appendToExecutorSet(self, executors):
221  # Normalise to something iterable
222  if isinstance(executors, transformExecutor):
223  executors = [executors,]
224  elif not isinstance(executors, (list, tuple, set)):
225  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
226  'Transform was initialised with an executor which was not a simple executor or an executor set')
227 
228  # TRY TO DEPRECATE SETTING trf IN THE EXECUTOR - USE CONF!
229  # Executor book keeping: set parent link back to me for all executors
230  # Also setup a dictionary, indexed by executor name and check that name is unique
231 
232  for executor in executors:
233  executor.trf = self
234  if executor.name in self._executorDictionary:
235  raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
236  'Transform has been initialised with two executors with the same name ({0})'
237  ' - executor names must be unique'.format(executor.name))
238  self._executors.add(executor)
239  self._executorDictionary[executor.name] = executor
240 
241 
242 
243  def parseCmdLineArgs(self, args):
244  msg.info('Transform command line was: %s', ' '.join(shQuoteStrings(sys.argv)))
245 
246  try:
247  # Use the argparse infrastructure to get the actual command line arguments
248  self._argdict=vars(self.parser.parse_args(args))
249 
250  # Need to know if any input or output files were set - if so then we suppress the
251  # corresponding parameters from AMI
252  inputFiles = outputFiles = False
253  for k, v in self._argdict.items():
254  if k.startswith('input') and isinstance(v, argFile):
255  inputFiles = True
256  elif k.startswith('output') and isinstance(v, argFile):
257  outputFiles = True
258  msg.debug("CLI Input files: {0}; Output files {1}".format(inputFiles, outputFiles))
259 
260  # Now look for special arguments, which expand out to other parameters
261  # Note that the pickled argdict beats AMIConfig because dict.update() will overwrite
262  # (However, we defend the real command line against updates from either source)
263  extraParameters = {}
264  # AMI configuration?
265  if 'AMIConfig' in self._argdict:
266  msg.debug('Given AMI tag configuration {0}'.format(self._argdict['AMIConfig']))
267  from PyJobTransforms.trfAMI import TagInfo
268  tag=TagInfo(self._argdict['AMIConfig'].value)
269  updateDict = {}
270  for k, v in dict(tag.trfs[0]).items():
271  # Convert to correct internal key form
272  k = cliToKey(k)
273  if inputFiles and k.startswith('input'):
274  msg.debug('Suppressing argument {0} from AMI'
275  ' because input files have been specified on the command line'.format(k))
276  continue
277  if outputFiles and k.startswith('output'):
278  msg.debug('Suppressing argument {0} from AMI'
279  ' because output files have been specified on the command line'.format(k))
280  continue
281  updateDict[k] = v
282  extraParameters.update(updateDict)
283 
284  # JSON arguments?
285  if 'argJSON' in self._argdict:
286  try:
287  import json
288  msg.debug('Given JSON encoded arguments in {0}'.format(self._argdict['argJSON']))
289  argfile = open(self._argdict['argJSON'], 'r')
290  jsonParams = json.load(argfile)
291  msg.debug('Read: {0}'.format(jsonParams))
292  extraParameters.update(convertToStr(jsonParams))
293  argfile.close()
294  except Exception as e:
295  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Error when deserialising JSON file {0} ({1})'.format(self._argdict['argJSON'], e))
296 
297  # Event Service
298  if 'eventService' in self._argdict and self._argdict['eventService'].value:
299  updateDict = {}
300  updateDict['athenaMPMergeTargetSize'] = '*:0'
301  updateDict['checkEventCount'] = False
302  updateDict['outputFileValidation'] = False
303  extraParameters.update(updateDict)
304 
305  # Process anything we found
306  # List of command line arguments
307  argsList = [ i.split("=", 1)[0].lstrip('-') for i in args if i.startswith('-')]
308  for k,v in extraParameters.items():
309  msg.debug('Found this extra argument: {0} with value: {1} ({2})'.format(k, v, type(v)))
310  if k not in self.parser._argClass and k not in self.parser._argAlias:
311  raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'), 'Argument "{0}" not known (try "--help")'.format(k))
312  # Check if it is an alias
313  if k in self.parser._argAlias:
314  msg.debug('Resolving alias from {0} to {1}'.format(k, self.parser._argAlias[k]))
315  k = self.parser._argAlias[k]
316  # Check if argument has already been set on the command line
317  if k in argsList:
318  msg.debug('Ignored {0}={1} as extra parameter because this argument was given on the command line.'.format(k, v))
319  continue
320  # For callable classes we instantiate properly, otherwise we set the value for simple arguments
321  if '__call__' in dir(self.parser._argClass[k]):
322  self._argdict[k] = self.parser._argClass[k](v)
323  else:
324  self._argdict[k] = v
325  msg.debug('Argument {0} set to {1}'.format(k, self._argdict[k]))
326 
327  # Set the key name as an argument property - useful to be able to look bask at where this
328  # argument came from
329  for k, v in self._argdict.items():
330  if isinstance(v, argument):
331  v.name = k
332  elif isinstance(v, list):
333  for it in v:
334  if isinstance(it, argument):
335  it.name = k
336 
337  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
338  if 'dumpPickle' in self._argdict:
339  msg.info('Now dumping pickled version of command line to {0}'.format(self._argdict['dumpPickle']))
340  pickledDump(self._argdict)
341  sys.exit(0)
342 
343  # Now we parsed all arguments, if a pickle/json dump is requested do it here and exit
344  if 'dumpJSON' in self._argdict:
345  msg.info('Now dumping JSON version of command line to {0}'.format(self._argdict['dumpJSON']))
346  JSONDump(self._argdict)
347  sys.exit(0)
348 
350  msg.critical('Argument parsing failure: {0!s}'.format(e))
351  self._exitCode = e.errCode
352  self._exitMsg = e.errMsg
353  self._report.fast = True
354  self.generateReport()
355  sys.exit(self._exitCode)
356 
358  msg.critical('AMI failure: {0!s}'.format(e))
359  self._exitCode = e.errCode
360  self._exitMsg = e.errMsg
361  sys.exit(self._exitCode)
362 
363  self.setGlobalLogLevel()
364 
365 
366 
367  def setGlobalLogLevel(self):
368  if 'verbose' in self._argdict:
369  setRootLoggerLevel(stdLogLevels['DEBUG'])
370  elif 'loglevel' in self._argdict:
371  if self._argdict['loglevel'] in stdLogLevels:
372  msg.info("Loglevel option found - setting root logger level to %s",
373  logging.getLevelName(stdLogLevels[self._argdict['loglevel']]))
374  setRootLoggerLevel(stdLogLevels[self._argdict['loglevel']])
375  else:
376  msg.warning('Unrecognised loglevel ({0}) given - ignored'.format(self._argdict['loglevel']))
377 
378 
379 
383  def execute(self):
384  msg.debug('Entering transform execution phase')
385 
386  try:
387  # Intercept a few special options here
388  if 'dumpargs' in self._argdict:
389  self.parser.dumpArgs()
390  sys.exit(0)
391 
392  # Graph stuff!
393  msg.info('Resolving execution graph')
394  self._setupGraph()
395 
396  if 'showSteps' in self._argdict:
397  for exe in self._executors:
398  print("Executor Step: {0} (alias {1})".format(exe.name, exe.substep))
399  if msg.level <= logging.DEBUG:
400  print(" {0} -> {1}".format(exe.inData, exe.outData))
401  sys.exit(0)
402 
403  if 'showGraph' in self._argdict:
404  print(self._executorGraph)
405  sys.exit(0)
406 
407  # Graph stuff!
408  msg.info('Starting to trace execution path')
409  self._tracePath()
410  msg.info('Execution path found with {0} step(s): {1}'.format(len(self._executorPath),
411  ' '.join([exe['name'] for exe in self._executorPath])))
412 
413  if 'showPath' in self._argdict:
414  msg.debug('Execution path list is: {0}'.format(self._executorPath))
415  # Now print it nice
416  print('Executor path is:')
417  for node in self._executorPath:
418  print(' {0}: {1} -> {2}'.format(node['name'], list(node['input']), list(node['output'])))
419  sys.exit(0)
420 
421  msg.debug('Execution path is {0}'.format(self._executorPath))
422 
423  # Prepare files for execution (separate method?)
424  for dataType in [ data for data in self._executorGraph.data if 'NULL' not in data ]:
425  if dataType in self._dataDictionary:
426  msg.debug('Data type {0} maps to existing argument {1}'.format(dataType, self._dataDictionary[dataType]))
427  else:
428  fileName = 'tmp.' + dataType
429  # How to pick the correct argFile class?
430  for (prefix, suffix) in (('tmp', ''), ('output', 'File'), ('input', 'File')):
431  stdArgName = prefix + dataType + suffix
432  if stdArgName in self.parser._argClass:
433  msg.debug('Matched data type {0} to argument {1}'.format(dataType, stdArgName))
434  self._dataDictionary[dataType] = self.parser._argClass[stdArgName](fileName)
435  self._dataDictionary[dataType].io = 'temporary'
436  break
437  if dataType not in self._dataDictionary:
438  if 'HIST' in fileName:
439  self._dataDictionary[dataType] = argHISTFile(fileName, io='temporary', type=dataType.lower())
440 
441  else:
442  self._dataDictionary[dataType] = argFile(fileName, io='temporary', type=dataType.lower())
443  msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
444  self._dataDictionary[dataType].name = fileName
445 
446  # Do splitting if required
447  self.setupSplitting()
448 
449  # Now we can set the final executor configuration properly, with the final dataDictionary
450  for executor in self._executors:
451  executor.conf.setFromTransform(self)
452 
453  self.validateInFiles()
454 
455  for executionStep in self._executorPath:
456  msg.debug('Now preparing to execute {0}'.format(executionStep))
457  executor = self._executorDictionary[executionStep['name']]
458  executor.preExecute(input = executionStep['input'], output = executionStep['output'])
459  try:
460  executor.execute()
461  executor.postExecute()
462  finally:
463  executor.validate()
464 
466  self.validateOutFiles()
467 
468  msg.debug('Transform executor succeeded')
469  self._exitCode = 0
470  self._exitMsg = trfExit.codeToName(self._exitCode)
471 
473  msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
474  self._exitCode = e.errCode
475  self._exitMsg = e.errMsg
476  self.generateReport(fast=False)
477 
479  msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
480  self._exitCode = e.errCode
481  self._exitMsg = e.errMsg
482  # Try and write a job report...
483  self.generateReport(fast=True)
484 
485  finally:
486  # Clean up any orphaned processes and exit here if things went bad
487  infanticide(message=True)
488  if self._exitCode:
489  msg.warning('Transform now exiting early with exit code {0} ({1})'.format(self._exitCode, self._exitMsg))
490  sys.exit(self._exitCode)
491 
492 
496  def _setupGraph(self):
497  # Get input/output data
498  self._inputData = list()
499  self._outputData = list()
500 
501  for key, value in self._argdict.items():
502  # Note specifier [A-Za-z0-9_]+? makes this match non-greedy (avoid swallowing the optional 'File' suffix)
503  m = re.match(r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
504  # N.B. Protect against taking argunents which are not type argFile
505  if m:
506  if isinstance(value, argFile):
507  if m.group(1) == 'input':
508  self._inputData.append(m.group(2))
509  else:
510  self._outputData.append(m.group(2))
511  self._dataDictionary[m.group(2)] = value
512  elif isinstance(value, list) and value and isinstance(value[0], argFile):
513  if m.group(1) == 'input':
514  self._inputData.append(m.group(2))
515  else:
516  self._outputData.append(m.group(2))
517  self._dataDictionary[m.group(2)] = value
518 
519 
521  if len(self._inputData) == 0:
522  self._inputData.append('inNULL')
523  if len(self._outputData) == 0:
524  self._outputData.append('outNULL')
525  msg.debug('Transform has this input data: {0}; output data {1}'.format(self._inputData, self._outputData))
526 
527  # Now see if we have any steering - manipulate the substep inputs and outputs before we
528  # setup the graph
529  if 'steering' in self._argdict:
530  msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
531  self._doSteering()
532 
533  # Setup the graph and topo sort it
534  self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
535  self._executorGraph.doToposort()
536 
537 
538  def setupSplitting(self):
539  if 'splitConfig' not in self._argdict:
540  return
541 
542  split = []
543  for executionStep in self._executorPath:
544  baseStepName = executionStep['name']
545  if baseStepName in split:
546  continue
547 
548  baseExecutor = self._executorDictionary[baseStepName]
549  splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
550  if splitting <= 1:
551  continue
552 
553  msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
554  index = self._executorPath.index(executionStep)
555  baseStep = self._executorPath.pop(index)
556  for i in range(splitting):
557  name = baseStepName + executorStepSuffix + str(i)
558  step = copy.deepcopy(baseStep)
559  step['name'] = name
560  self._executorPath.insert(index + i, step)
561  executor = copy.deepcopy(baseExecutor)
562  executor.name = name
563  executor.conf.executorStep = i
564  executor.conf.totalExecutorSteps = splitting
565  self._executors.add(executor)
566  self._executorDictionary[name] = executor
567  split.append(name)
568 
569 
573  def _tracePath(self):
574  self._executorGraph.findExecutionPath()
575 
576  self._executorPath = self._executorGraph.execution
577  if len(self._executorPath) == 0:
578  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'),
579  'Execution path finding resulted in no substeps being executed'
580  '(Did you correctly specify input data for this transform?)')
581  # Tell the first executor that they are the first
582  self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
583 
584 
588  def _doSteering(self, steeringDict = None):
589  if not steeringDict:
590  steeringDict = self._argdict['steering'].value
591  for substep, steeringValues in steeringDict.items():
592  foundSubstep = False
593  for executor in self._executors:
594  if executor.name == substep or executor.substep == substep:
595  foundSubstep = True
596  msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
597  # Steering consists of tuples with (in/out, +/-, datatype)
598  for steeringValue in steeringValues:
599  if steeringValue[0] == 'in':
600  startSet = executor.inData
601  else:
602  startSet = executor.outData
603  origLen = len(startSet)
604  msg.debug('Data values to be modified are: {0}'.format(startSet))
605  if steeringValue[1] == '+':
606  startSet.add(steeringValue[2])
607  if len(startSet) != origLen + 1:
608  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
609  'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
610  else:
611  startSet.discard(steeringValue[2])
612  if len(startSet) != origLen - 1:
613  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
614  'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
615  msg.debug('Updated data values to: {0}'.format(startSet))
616  if not foundSubstep:
617  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
618  'This transform has no executor/substep {0}'.format(substep))
619 
620 
621 
623  @property
624  def lastExecuted(self):
625  # Just make sure we have the path traced
626  if not hasattr(self, '_executorPath') or len(self._executorPath) == 0:
627  return None
628 
629  lastExecutor = self._executorDictionary[self._executorPath[0]['name']]
630  for executorStep in self._executorPath[1:]:
631  if self._executorDictionary[executorStep['name']].hasExecuted:
632  lastExecutor = self._executorDictionary[executorStep['name']]
633  return lastExecutor
634 
635 
636 
645  def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
646  msg.debug('Transform report generator')
647 
648  if 'reportType' in self._argdict:
649  if reportType is not None:
650  msg.info('Transform requested report types {0} overridden by command line to {1}'.format(reportType, self._argdict['reportType'].value))
651  reportType = self._argdict['reportType'].value
652 
653  if reportType is None:
654  reportType = ['json', ]
655  # Only generate the Tier0 report at Tier0 ;-)
656  # (It causes spurious warnings for some grid jobs with background files (e.g., digitisation)
657  if 'TZHOME' in os.environ:
658  reportType.append('gpickle')
659 
660  if not isInteractiveEnv():
661  reportType.append('text')
662  msg.debug('Detected Non-Interactive environment. Enabled text report')
663 
664  if 'reportName' in self._argdict:
665  baseName = classicName = self._argdict['reportName'].value
666  else:
667  baseName = 'jobReport'
668  classicName = 'metadata'
669 
670  try:
671  # Text: Writes environment variables and machine report in text format.
672  if reportType is None or 'text' in reportType:
673  envName = baseName if 'reportName' in self._argdict else 'env' # Use fallback name 'env.txt' if it's not specified.
674  self._report.writeTxtReport(filename='{0}.txt'.format(envName), fast=fast, fileReport=fileReport)
675  # JSON
676  if reportType is None or 'json' in reportType:
677  self._report.writeJSONReport(filename='{0}.json'.format(baseName), fast=fast, fileReport=fileReport)
678  # Classic XML
679  if reportType is None or 'classic' in reportType:
680  self._report.writeClassicXMLReport(filename='{0}.xml'.format(classicName), fast=fast)
681  # Classic gPickle
682  if reportType is None or 'gpickle' in reportType:
683  self._report.writeGPickleReport(filename='{0}.gpickle'.format(baseName), fast=fast)
684  # Pickled version of the JSON report for pilot
685  if reportType is None or 'pilotPickle' in reportType:
686  self._report.writePilotPickleReport(filename='{0}Extract.pickle'.format(baseName), fast=fast, fileReport=fileReport)
687 
688  except trfExceptions.TransformTimeoutException as reportException:
689  msg.error('Received timeout when writing report ({0})'.format(reportException))
690  msg.error('Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
691  if ('orphanKiller' in self._argdict):
692  infanticide(message=True, listOrphans=True)
693  else:
694  infanticide(message=True)
695  sys.exit(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'))
696 
697  except trfExceptions.TransformException as reportException:
698  # This is a bad one!
699  msg.critical('Attempt to write job report failed with exception {0!s}: {1!s}'.format(reportException.__class__.__name__, reportException))
700  msg.critical('Stack trace now follows:\n{0}'.format(traceback.format_exc()))
701  msg.critical('Job reports are likely to be missing or incomplete - sorry')
702  msg.critical('Please report this as a transforms bug!')
703  msg.critical('Before calling the report generator the transform status was: {0}; exit code {1}'.format(self._exitMsg, self._exitCode))
704  msg.critical('Now exiting with a transform internal error code')
705  if ('orphanKiller' in self._argdict):
706  infanticide(message=True, listOrphans=True)
707  else:
708  infanticide(message=True)
709  sys.exit(trfExit.nameToCode('TRF_INTERNAL'))
710 
711 
712  # Description stolen from old trfs...
713 
724  def _exitWithReport(self, signum, frame):
725  msg.critical('Transform received signal {0}'.format(signum))
726  msg.critical('Stack trace now follows:\n{0!s}'.format(''.join(traceback.format_stack(frame))))
727  self._exitCode = 128+signum
728  self._exitMsg = 'Transform received signal {0}'.format(signum)
729 
730  # Reset signal handlers now - we don't want to recurse if the same signal arrives again (e.g. multiple ^C)
732 
733  msg.critical('Attempting to write reports with known information...')
734  self.generateReport(fast=True)
735  if ('orphanKiller' in self._argdict):
736  infanticide(message=True, listOrphans=True)
737  else:
738  infanticide(message=True)
739 
740  sys.exit(self._exitCode)
741 
742 
748  def updateValidationDict(self, newValidationOptions):
749  self.validation.update(newValidationOptions)
750 
751 
753  def getValidationDict(self):
754  return self.validation
755 
756 
759  def getValidationOption(self, key):
760  if key in self.validation:
761  return self.validation[key]
762  else:
763  return None
764 
765 
768  def getFiles(self, io = None):
769  res = []
770  msg.debug('Looking for file arguments matching: io={0}'.format(io))
771  for argName, arg in self._argdict.items():
772  if isinstance(arg, argFile):
773  msg.debug('Argument {0} is argFile type ({1!s})'.format(argName, arg))
774  if io is not None and arg.io != io:
775  continue
776  msg.debug('Argument {0} matches criteria'.format(argName))
777  res.append(arg)
778  return res
779 
780 
781  def validateInFiles(self):
782  if self._inFileValidationStart is None:
783  self._inFileValidationStart = os.times()
784  msg.debug('inFileValidationStart time is {0}'.format(self._inFileValidationStart))
785 
786  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
787  ('skipInputFileValidation' in self._argdict and self._argdict['skipInputFileValidation'] is True) or
788  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
789  ('inputFileValidation' in self._argdict and self._argdict['inputFileValidation'].value is False)
790  ):
791  msg.info('Standard input file validation turned off for transform %s.', self.name)
792  else:
793  msg.info('Validating input files')
794  if 'parallelFileValidation' in self._argdict:
795  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
796  else:
797  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
798 
799  self._inFileValidationStop = os.times()
800  msg.debug('inFileValidationStop time is {0}'.format(self._inFileValidationStop))
801 
802  def validateOutFiles(self):
803  if self._outFileValidationStart is None:
804  self._outFileValidationStart = os.times()
805  msg.debug('outFileValidationStart time is {0}'.format(self._outFileValidationStart))
806 
807  if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
808  ('skipOutputFileValidation' in self._argdict and self._argdict['skipOutputFileValidation'] is True) or
809  ('fileValidation' in self._argdict and self._argdict['fileValidation'].value is False) or
810  ('outputFileValidation' in self._argdict and self._argdict['outputFileValidation'].value is False)
811  ):
812  msg.info('Standard output file validation turned off for transform %s.', self.name)
813  else:
814  msg.info('Validating output files')
815  parparallelMode = False
816  parmultithreadedMode = False
817  if 'parallelFileValidation' in self._argdict:
818  parparallelMode = self._argdict['parallelFileValidation'].value
819  if 'multithreadedFileValidation' in self._argdict:
820  parmultithreadedMode = self._argdict['multithreadedFileValidation'].value
821  trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)
822 
823  self._outFileValidationStop = os.times()
824  msg.debug('outFileValidationStop time is {0}'.format(self._outFileValidationStop))
PyJobTransforms.trfSignal
Signal handling utilities for ATLAS job transforms.
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
python.trfExceptions.TransformNeedCheckException
Exception used when the job wants to signal that it should get manual intervention at Tier-0.
Definition: trfExceptions.py:91
PyJobTransforms.trfAMI
Utilities for configuration of transforms via AMI tags.
python.transform.transform.dataDictionary
def dataDictionary(self)
Definition: transform.py:137
python.trfUtils.pickledDump
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
Definition: trfUtils.py:598
python.transform.transform._argdict
_argdict
Argument dictionary for this transform.
Definition: transform.py:81
python.transform.transform._setupGraph
def _setupGraph(self)
Setup the executor graph.
Definition: transform.py:496
python.trfExceptions.TransformSetupException
Setup exceptions.
Definition: trfExceptions.py:42
python.transform.transform.execute
def execute(self)
Execute transform.
Definition: transform.py:383
vtune_athena.format
format
Definition: vtune_athena.py:14
PyJobTransforms.trfGraph
Transform graph utilities.
python.transform.transform.lastExecuted
def lastExecuted(self)
Return the last executor which actually executed.
Definition: transform.py:624
python.trfArgs.addValidationArguments
def addValidationArguments(parser)
Definition: trfArgs.py:546
python.transform.transform._outputData
_outputData
Definition: transform.py:499
python.transform.transform.report
def report(self)
Definition: transform.py:141
python.transform.transform._tracePath
def _tracePath(self)
Trace the path through the executor graph.
Definition: transform.py:573
index
Definition: index.py:1
python.trfSignal.resetTrfSignalHandlers
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
Definition: trfSignal.py:40
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1683
python.trfExceptions.TransformArgException
Group of argument based exceptions.
Definition: trfExceptions.py:38
python.transform.transform.parser
parser
Definition: transform.py:68
PyJobTransforms.trfReports
Transform report classes and helper functions.
python.transform.transform.outFileValidationWallTime
def outFileValidationWallTime(self)
Definition: transform.py:189
python.transform.transform._dataDictionary
_dataDictionary
Dsta dictionary place holder (this maps data types to their argFile instances)
Definition: transform.py:84
python.transform.transform.inFileValidationWallTime
def inFileValidationWallTime(self)
Definition: transform.py:173
python.transform.transform.outFileValidationCpuTime
def outFileValidationCpuTime(self)
Definition: transform.py:181
python.transform.transform
Core transform class.
Definition: transform.py:39
python.transform.transform.getValidationOption
def getValidationOption(self, key)
Getter for a specific validation option.
Definition: transform.py:759
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.transform.transform.exitCode
def exitCode(self)
Definition: transform.py:117
python.trfArgs.addStandardTrfArgs
def addStandardTrfArgs(parser)
Add standard transform arguments to an argparse ArgumentParser.
Definition: trfArgs.py:16
python.transform.transform.setGlobalLogLevel
def setGlobalLogLevel(self)
Check transform argument dictionary and set the correct root logger option.
Definition: transform.py:367
python.transform.transform.inFileValidationCpuTime
def inFileValidationCpuTime(self)
Definition: transform.py:165
PyJobTransforms.trfExitCodes
Module for transform exit codes.
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.transform.transform.transformSetupCpuTime
def transformSetupCpuTime(self)
Definition: transform.py:149
python.transform.transform._outFileValidationStop
_outFileValidationStop
Definition: transform.py:57
python.transform.transform.getValidationDict
def getValidationDict(self)
Getter function for transform validation dictionary.
Definition: transform.py:753
python.transform.transform.generateReport
def generateReport(self, reportType=None, fast=False, fileReport=defaultFileReport)
Transform report generator.
Definition: transform.py:645
python.transform.transform._trfPredata
_trfPredata
Get trf pre-data as early as possible.
Definition: transform.py:60
python.trfLogger.setRootLoggerLevel
def setRootLoggerLevel(level)
Change the loggging level of the root logger.
Definition: trfLogger.py:56
python.transform.transform._processedEvents
_processedEvents
Transform processed events.
Definition: transform.py:103
python.transform.transform._inputData
_inputData
Definition: transform.py:498
python.transform.transform.exitMsg
def exitMsg(self)
Definition: transform.py:125
python.trfUtils.cliToKey
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
Definition: trfUtils.py:652
python.transform.transform.validateOutFiles
def validateOutFiles(self)
Definition: transform.py:802
python.transform.transform.validateInFiles
def validateInFiles(self)
Definition: transform.py:781
TagInfo
This class contains the list of currently valid tags for detector description - GeoModel and IOV/Cond...
Definition: TagInfo.h:41
python.transform.transform.outFileValidationStop
def outFileValidationStop(self)
Definition: transform.py:197
python.transform.transform.updateValidationDict
def updateValidationDict(self, newValidationOptions)
Setter for transform's validation dictionary.
Definition: transform.py:748
python.transform.transform._outFileValidationStart
_outFileValidationStart
Definition: transform.py:56
python.transform.transform._inFileValidationStart
_inFileValidationStart
Definition: transform.py:54
python.trfExceptions.TransformAMIException
Exception used by configuration via AMI tags.
Definition: trfExceptions.py:86
python.trfExeStepTools.getTotalExecutorSteps
def getTotalExecutorSteps(executor, argdict=None)
Definition: trfExeStepTools.py:29
python.transform.transform.transformStart
def transformStart(self)
Definition: transform.py:145
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.trfUtils.convertToStr
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
Definition: trfUtils.py:636
python.trfExceptions.TransformInternalException
Transform internal errors.
Definition: trfExceptions.py:74
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.transform.transform._exitWithReport
def _exitWithReport(self, signum, frame)
Common signal handler.
Definition: transform.py:724
PyJobTransforms.trfValidation
Validation control for job transforms.
python.transform.transform.name
def name(self)
Definition: transform.py:113
python.transform.transform.setupSplitting
def setupSplitting(self)
Setup executor splitting.
Definition: transform.py:538
beamspotman.dir
string dir
Definition: beamspotman.py:623
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:224
python.trfExceptions.TransformTimeoutException
Exception used by time limited executions.
Definition: trfExceptions.py:78
python.transform.transform.parseCmdLineArgs
def parseCmdLineArgs(self, args)
Parse command line arguments for a transform.
Definition: transform.py:243
python.transform.transform.trfPredata
def trfPredata(self)
Definition: transform.py:201
python.transform.transform.getFiles
def getFiles(self, io=None)
Return a list of fileArgs used by the transform.
Definition: transform.py:768
python.transform.transform.transformSetupWallTime
def transformSetupWallTime(self)
Definition: transform.py:157
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.transform.transform._exitMsg
_exitMsg
Definition: transform.py:97
python.transform.transform._doSteering
def _doSteering(self, steeringDict=None)
Setup steering, which manipulates the graph before we trace the path for this transform.
Definition: transform.py:588
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.transform.transform._executorDictionary
_executorDictionary
Definition: transform.py:89
python.transform.transform.appendToExecutorSet
def appendToExecutorSet(self, executors)
Definition: transform.py:220
python.trfSignal.setTrfSignalHandlers
def setTrfSignalHandlers(handler)
Install common handler for various signals.
Definition: trfSignal.py:28
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1691
python.trfArgs.addFileValidationArguments
def addFileValidationArguments(parser)
Definition: trfArgs.py:526
python.transform.transform._transformStart
_transformStart
Get transform starting timestamp as early as possible.
Definition: transform.py:51
PyJobTransforms.trfExe
Transform execution functions.
python.trfExceptions.TransformException
Base class for transform exceptions.
Definition: trfExceptions.py:14
Trk::open
@ open
Definition: BinningType.h:40
dqt_zlumi_pandas.update
update
Definition: dqt_zlumi_pandas.py:42
python.transform.transform._executors
_executors
Definition: transform.py:88
python.trfUtils.JSONDump
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
Definition: trfUtils.py:617
PyJobTransforms.trfUtils
Transform utility functions.
python.transform.transform._executorGraph
_executorGraph
Definition: transform.py:534
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
PyJobTransforms.trfLogger
Logging configuration for ATLAS job transforms.
python.transform.transform._executorPath
_executorPath
Definition: transform.py:576
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
confTool.parse_args
def parse_args()
Definition: confTool.py:35
python.transform.transform.__init__
def __init__(self, standardSignalHandlers=True, standardTrfArgs=True, standardValidationArgs=True, trfName=None, executor=None, exeArgs=None, description='')
Initialise a job transform.
Definition: transform.py:46
python.transform.transform._inFileValidationStop
_inFileValidationStop
Definition: transform.py:55
pickleTool.object
object
Definition: pickleTool.py:30
python.transform.transform.executors
def executors(self)
Definition: transform.py:205
str
Definition: BTagTrackIpAccessor.cxx:11
python.transform.transform._report
_report
Report object for this transform.
Definition: transform.py:100
python.transform.transform.argdict
def argdict(self)
Definition: transform.py:133
python.trfUtils.shQuoteStrings
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
Definition: trfUtils.py:361
python.transform.transform.getProcessedEvents
def getProcessedEvents(self)
Definition: transform.py:212
python.transform.transform.processedEvents
def processedEvents(self)
Definition: transform.py:209
python.transform.transform._exitCode
_exitCode
Transform exit code/message holders.
Definition: transform.py:96
python.transform.transform._name
_name
Transform _name.
Definition: transform.py:63
python.trfUtils.infanticide
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.
Definition: trfUtils.py:132