ATLAS Offline Software
trfExe.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 
8 
9 import json
10 import math
11 import os
12 import os.path as path
13 import re
14 import signal
15 import subprocess
16 import sys
17 import time
18 
19 import logging
20 from fnmatch import fnmatch
21 msg = logging.getLogger(__name__)
22 
23 from PyJobTransforms.trfJobOptions import JobOptionsTemplate
24 from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
25  cvmfsDBReleaseCheck, forceToAlphaNum, \
26  ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27 from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
28 from PyJobTransforms.trfExitCodes import trfExit
29 from PyJobTransforms.trfLogger import stdLogLevels
30 from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
31 from PyJobTransforms.trfMTTools import detectAthenaMTThreads
32 
33 import PyJobTransforms.trfExceptions as trfExceptions
34 import PyJobTransforms.trfValidation as trfValidation
35 import PyJobTransforms.trfArgClasses as trfArgClasses
36 import PyJobTransforms.trfEnv as trfEnv
37 
38 
39 # Depending on the setting of LANG, sys.stdout may end up with ascii or ansi
40 # encoding, rather than utf-8. But Athena uses unicode for some log messages
41 # (another example of Gell-Mann's totalitarian principle) and that will result
42 # in a crash with python 3. In such a case, force the use of a utf-8 encoded
43 # output stream instead.
45  enc = s.encoding.lower()
46  if enc.find('ascii') >= 0 or enc.find('ansi') >= 0:
47  return open (s.fileno(), 'w', encoding='utf-8')
48  return s
49 
50 
51 
57 
58 
62  def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
63  self._argdict = argdict
64  self._dataDictionary = dataDictionary
65  self._firstExecutor = firstExecutor
66  self._executorStep = -1
68 
69  @property
70  def argdict(self):
71  return self._argdict
72 
73  @argdict.setter
74  def argdict(self, value):
75  self._argdict = value
76 
77  @property
78  def dataDictionary(self):
79  return self._dataDictionary
80 
81  @dataDictionary.setter
82  def dataDictionary(self, value):
83  self._dataDictionary = value
84 
85  @property
86  def firstExecutor(self):
87  return self._firstExecutor
88 
89  @firstExecutor.setter
90  def firstExecutor(self, value):
91  self._firstExecutor = value
92 
93  @property
94  def executorStep(self):
95  return self._executorStep
96 
97  @executorStep.setter
98  def executorStep(self, value):
99  self._executorStep = value
100 
101  @property
103  return self._totalExecutorSteps
104 
105  @totalExecutorSteps.setter
106  def totalExecutorSteps(self, value):
107  self._totalExecutorSteps = value
108 
109 
112  def setFromTransform(self, trf):
113  self._argdict = trf.argdict
114  self._dataDictionary = trf.dataDictionary
115 
116 
117  def addToArgdict(self, key, value):
118  self._argdict[key] = value
119 
120 
121  def addToDataDictionary(self, key, value):
122  self._dataDictionary[key] = value
123 
124 
125 
127 
128 
137  def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
138  # Some information to produce helpful log messages
139 
140  self._name = forceToAlphaNum(name)
141  # Data this executor can start from and produce
142  # Note we transform NULL to inNULL and outNULL as a convenience
143  self._inData = set(inData)
144  self._outData = set(outData)
145  if 'NULL' in self._inData:
146  self._inData.remove('NULL')
147  self._inData.add('inNULL')
148  if 'NULL' in self._outData:
149  self._outData.remove('NULL')
150  self._outData.add('outNULL')
151 
152  # It's forbidden for an executor to consume and produce the same datatype
153  dataOverlap = self._inData & self._outData
154  if len(dataOverlap) > 0:
155  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
156  'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self._name, ' '.join(dataOverlap)))
157 
158 
161  if conf is not None:
162  self.conf = conf
163  else:
164  self.conf = executorConfig()
165  if trf is not None:
166  self.conf.setFromTransform(trf)
167 
168  # Execution status
169  self._hasExecuted = False
170  self._rc = -1
171  self._errMsg = None
172 
173  # Validation status
174  self._hasValidated = False
175  self._isValidated = False
176 
177  # Extra metadata
178  # This dictionary holds extra metadata for this executor which will be
179  # provided in job reports
180  self._extraMetadata = {}
181 
182 
185  self._preExeStart = None
186  self._exeStart = self._exeStop = None
187  self._valStart = self._valStop = None
188  self._memStats = {}
189  self._memLeakResult = {}
190  self._memFullFile = None
191  self._eventCount = None
192  self._athenaMP = 0
193  self._athenaMT = 0
195  self._dbMonitor = None
196  self._resimevents = None
198  self._containerSetup = None
199 
200  # Holder for execution information about any merges done by this executor in MP mode
201  self._myMerger = []
202 
203 
204 
205  @property
206  def myMerger(self):
207  return self._myMerger
208 
209  @property
210  def name(self):
211  return self._name
212 
213  @name.setter
214  def name(self, value):
215  self._name = value
216 
217  @property
218  def substep(self):
219  if '_substep' in dir(self):
220  return self._substep
221  return None
222 
223  @property
224  def trf(self):
225  if '_trf' in dir(self):
226  return self._trf
227  return None
228 
229  @trf.setter
230  def trf(self, value):
231  self._trf = value
232 
233  @property
234  def inData(self):
235 
236  if '_inData' in dir(self):
237  return self._inData
238  return None
239 
240  @inData.setter
241  def inData(self, value):
242  self._inData = set(value)
243 
244  def inDataUpdate(self, value):
245 
246  if '_inData' in dir(self):
247  self._inData.update(value)
248  else:
249 
250  self.inData = value
251 
252 
253  @property
254  def outData(self):
255 
256  if '_outData' in dir(self):
257  return self._outData
258  return None
259 
260  @outData.setter
261  def outData(self, value):
262  self._outData = set(value)
263 
264  def outDataUpdate(self, value):
265 
266  if '_outData' in dir(self):
267  self._outData.update(value)
268  else:
269 
270  self.outData = value
271 
272  @property
273 
275  def input(self):
276 
277  if '_input' in dir(self):
278  return self._input
279  return None
280 
281  @property
282 
284  def output(self):
285 
286  if '_output' in dir(self):
287  return self._output
288  return None
289 
290  @property
291  def extraMetadata(self):
292  return self._extraMetadata
293 
294  @property
295  def hasExecuted(self):
296  return self._hasExecuted
297 
298  @property
299  def rc(self):
300  return self._rc
301 
302  @property
303  def errMsg(self):
304  return self._errMsg
305 
306  @property
307  def validation(self):
308  return self._validation
309 
310  @validation.setter
311  def validation(self, value):
312  self._validation = value
313 
314  @property
315  def hasValidated(self):
316  return self._hasValidated
317 
318  @property
319  def isValidated(self):
320  return self._isValidated
321 
322 
323  @property
324  def first(self):
325  if hasattr(self, '_first'):
326  return self._first
327  else:
328  return None
329 
330  @property
331  def preExeStartTimes(self):
332  return self._preExeStart
333 
334  @property
335  def exeStartTimes(self):
336  return self._exeStart
337 
338  @property
339  def exeStopTimes(self):
340  return self._exeStop
341 
342  @property
343  def valStartTimes(self):
344  return self._valStart
345 
346  @property
347  def valStopTimes(self):
348  return self._valStop
349 
350  @property
351  def preExeCpuTime(self):
352  if self._preExeStart and self._exeStart:
353  return calcCpuTime(self._preExeStart, self._exeStart)
354  else:
355  return None
356 
357  @property
358  def preExeWallTime(self):
359  if self._preExeStart and self._exeStart:
360  return calcWallTime(self._preExeStart, self._exeStart)
361  else:
362  return None
363 
364  @property
365  def cpuTime(self):
366  if self._exeStart and self._exeStop:
367  return calcCpuTime(self._exeStart, self._exeStop)
368  else:
369  return None
370 
371  @property
372  def usrTime(self):
373  if self._exeStart and self._exeStop:
374  return self._exeStop[2] - self._exeStart[2]
375  else:
376  return None
377 
378  @property
379  def sysTime(self):
380  if self._exeStart and self._exeStop:
381  return self._exeStop[3] - self._exeStart[3]
382  else:
383  return None
384 
385  @property
386  def wallTime(self):
387  if self._exeStart and self._exeStop:
388  return calcWallTime(self._exeStart, self._exeStop)
389  else:
390  return None
391 
392  @property
393  def memStats(self):
394  return self._memStats
395 
396  @property
397  def memAnalysis(self):
398  return self._memLeakResult
399 
400  @property
401  def postExeCpuTime(self):
402  if self._exeStop and self._valStart:
403  return calcCpuTime(self._exeStop, self._valStart)
404  else:
405  return None
406 
407  @property
408  def postExeWallTime(self):
409  if self._exeStop and self._valStart:
410  return calcWallTime(self._exeStop, self._valStart)
411  else:
412  return None
413 
414  @property
415  def validationCpuTime(self):
416  if self._valStart and self._valStop:
417  return calcCpuTime(self._valStart, self._valStop)
418  else:
419  return None
420 
421  @property
423  if self._valStart and self._valStop:
424  return calcWallTime(self._valStart, self._valStop)
425  else:
426  return None
427 
428  @property
429  def cpuTimeTotal(self):
430  if self._preExeStart and self._valStop:
431  return calcCpuTime(self._preExeStart, self._valStop)
432  else:
433  return None
434 
435  @property
436  def wallTimeTotal(self):
437  if self._preExeStart and self._valStop:
438  return calcWallTime(self._preExeStart, self._valStop)
439  else:
440  return None
441 
442  @property
443  def eventCount(self):
444  return self._eventCount
445 
446  @property
447  def reSimEvent(self):
448  return self._resimevents
449 
450  @property
451  def athenaMP(self):
452  return self._athenaMP
453 
454  @property
455  def dbMonitor(self):
456  return self._dbMonitor
457 
458 
459  # set start times, if not set already
460  def setPreExeStart(self):
461  if self._preExeStart is None:
462  self._preExeStart = os.times()
463  msg.debug('preExeStart time is {0}'.format(self._preExeStart))
464 
465  def setValStart(self):
466  if self._valStart is None:
467  self._valStart = os.times()
468  msg.debug('valStart time is {0}'.format(self._valStart))
469 
470  def preExecute(self, input = set(), output = set()):
471  self.setPreExeStart()
472  msg.info('Preexecute for %s', self._name)
473 
474  def execute(self):
475  self._exeStart = os.times()
476  msg.debug('exeStart time is {0}'.format(self._exeStart))
477  msg.info('Starting execution of %s', self._name)
478  self._hasExecuted = True
479  self._rc = 0
480  self._errMsg = ''
481  msg.info('%s executor returns %d', self._name, self._rc)
482  self._exeStop = os.times()
483  msg.debug('preExeStop time is {0}'.format(self._exeStop))
484 
485  def postExecute(self):
486  msg.info('Postexecute for %s', self._name)
487 
488  def validate(self):
489  self.setValStart()
490  self._hasValidated = True
491  msg.info('Executor %s has no validation function - assuming all ok', self._name)
492  self._isValidated = True
493  self._errMsg = ''
494  self._valStop = os.times()
495  msg.debug('valStop time is {0}'.format(self._valStop))
496 
497 
498  def doAll(self, input=set(), output=set()):
499  self.preExecute(input, output)
500  self.execute()
501  self.postExecute()
502  self.validate()
503 
504 
506  def __init__(self, name = 'Logscan'):
507  super(logscanExecutor, self).__init__(name=name)
508  self._errorMaskFiles = None
509  self._logFileName = None
510 
511  def preExecute(self, input = set(), output = set()):
512  self.setPreExeStart()
513  msg.info('Preexecute for %s', self._name)
514  if 'logfile' in self.conf.argdict:
515  self._logFileName = self.conf.argdict['logfile'].value
516 
517  def validate(self):
518  self.setValStart()
519  msg.info("Starting validation for {0}".format(self._name))
520  if self._logFileName:
521 
523  if 'ignorePatterns' in self.conf.argdict:
524  igPat = self.conf.argdict['ignorePatterns'].value
525  else:
526  igPat = []
527  if 'ignoreFiles' in self.conf.argdict:
528  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
529  elif self._errorMaskFiles is not None:
530  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
531  else:
532  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
533 
534  # Now actually scan my logfile
535  msg.info('Scanning logfile {0} for errors'.format(self._logFileName))
536  self._logScan = trfValidation.athenaLogFileReport(logfile = self._logFileName, ignoreList = ignorePatterns)
537  worstError = self._logScan.worstError()
538 
539  # In general we add the error message to the exit message, but if it's too long then don't do
540  # that and just say look in the jobReport
541  if worstError['firstError']:
542  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
543  if 'CoreDumpSvc' in worstError['firstError']['message']:
544  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
545  elif 'G4Exception' in worstError['firstError']['message']:
546  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
547  else:
548  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
549  else:
550  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
551  else:
552  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
553 
554  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
555  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
556  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
557  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
558  self._isValidated = False
559  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
560  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
561  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
562 
563  # Must be ok if we got here!
564  msg.info('Executor {0} has validated successfully'.format(self.name))
565  self._isValidated = True
566  self._errMsg = ''
567 
568  self._valStop = os.times()
569  msg.debug('valStop time is {0}'.format(self._valStop))
570 
571 
573  def __init__(self, name = 'Echo', trf = None):
574 
575  # We are only changing the default name here
576  super(echoExecutor, self).__init__(name=name, trf=trf)
577 
578 
579  def execute(self):
580  self._exeStart = os.times()
581  msg.debug('exeStart time is {0}'.format(self._exeStart))
582  msg.info('Starting execution of %s', self._name)
583  msg.info('Transform argument dictionary now follows:')
584  for k, v in self.conf.argdict.items():
585  print("%s = %s" % (k, v))
586  self._hasExecuted = True
587  self._rc = 0
588  self._errMsg = ''
589  msg.info('%s executor returns %d', self._name, self._rc)
590  self._exeStop = os.times()
591  msg.debug('exeStop time is {0}'.format(self._exeStop))
592 
593 
595  def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
596 
597  # We are only changing the default name here
598  super(dummyExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
599 
600 
601  def execute(self):
602  self._exeStart = os.times()
603  msg.debug('exeStart time is {0}'.format(self._exeStart))
604  msg.info('Starting execution of %s', self._name)
605  for type in self._outData:
606  for k, v in self.conf.argdict.items():
607  if type in k:
608  msg.info('Creating dummy output file: {0}'.format(self.conf.argdict[k].value[0]))
609  open(self.conf.argdict[k].value[0], 'a').close()
610  self._hasExecuted = True
611  self._rc = 0
612  self._errMsg = ''
613  msg.info('%s executor returns %d', self._name, self._rc)
614  self._exeStop = os.times()
615  msg.debug('exeStop time is {0}'.format(self._exeStop))
616 
617 
619  def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData = set(),
620  exe = None, exeArgs = None, memMonitor = True):
621  # Name of the script we want to execute
622  self._exe = exe
623 
624  # With arguments (currently this means paste in the corresponding _argdict entry)
625  self._exeArgs = exeArgs
626 
627  super(scriptExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
628 
629  self._extraMetadata.update({'script' : exe})
630 
631  # Decide if we echo script output to stdout
632  self._echoOutput = False
633 
634  # Can either be written by base class or child
635  self._cmd = None
636 
637  self._memMonitor = memMonitor
638 
639  @property
640  def exe(self):
641  return self._exe
642 
643  @exe.setter
644  def exe(self, value):
645  self._exe = value
646  self._extraMetadata['script'] = value
647 
648  @property
649  def exeArgs(self):
650  return self._exeArgs
651 
652  @exeArgs.setter
653  def exeArgs(self, value):
654  self._exeArgs = value
655 # self._extraMetadata['scriptArgs'] = value
656 
657  def preExecute(self, input = set(), output = set()):
658  self.setPreExeStart()
659  msg.debug('scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
660 
661  self._input = input
662  self._output = output
663 
664 
665  if self._cmd is None:
666  self._buildStandardCommand()
667  msg.info('Will execute script as %s', self._cmd)
668 
669  # Define this here to have it for environment detection messages
670  self._logFileName = "log.{0}".format(self._name)
671 
672 
674  if 'TRF_ECHO' in os.environ:
675  msg.info('TRF_ECHO envvar is set - enabling command echoing to stdout')
676  self._echoOutput = True
677  elif 'TRF_NOECHO' in os.environ:
678  msg.info('TRF_NOECHO envvar is set - disabling command echoing to stdout')
679  self._echoOutput = False
680  # PS1 is for sh, bash; prompt is for tcsh and zsh
681  elif isInteractiveEnv():
682  msg.info('Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
683  self._echoOutput = True
684  elif 'TZHOME' in os.environ:
685  msg.info('Tier-0 environment detected - enabling command echoing to stdout')
686  self._echoOutput = True
687  if self._echoOutput is False:
688  msg.info('Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self._name, self._logFileName))
689 
690  # Now setup special loggers for logging execution messages to stdout and file
691  self._echologger = logging.getLogger(self._name)
692  self._echologger.setLevel(logging.INFO)
693  self._echologger.propagate = False
694 
695  encargs = {'encoding' : 'utf-8'}
696  self._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
697  self._exeLogFile.setFormatter(logging.Formatter('%(asctime)s %(message)s', datefmt='%H:%M:%S'))
698  self._echologger.addHandler(self._exeLogFile)
699 
700  if self._echoOutput:
701  self._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
702  self._echostream.setFormatter(logging.Formatter('%(name)s %(asctime)s %(message)s', datefmt='%H:%M:%S'))
703  self._echologger.addHandler(self._echostream)
704 
706  if self._exe:
707  self._cmd = [self.exe, ]
708  else:
709  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
710  'No executor set in {0}'.format(self.__class__.__name__))
711  for arg in self.exeArgs:
712  if arg in self.conf.argdict:
713  # If we have a list then add each element to our list, else just str() the argument value
714  # Note if there are arguments which need more complex transformations then
715  # consider introducing a special toExeArg() method.
716  if isinstance(self.conf.argdict[arg].value, list):
717  self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
718  else:
719  self._cmd.append(str(self.conf.argdict[arg].value))
720 
721 
722  def execute(self):
723  self._hasExecuted = True
724  msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
725 
726  self._exeStart = os.times()
727  msg.debug('exeStart time is {0}'.format(self._exeStart))
728  if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
729  msg.info('execOnly flag is set - execution will now switch, replacing the transform')
730  os.execvp(self._cmd[0], self._cmd)
731 
732  encargs = {'encoding' : 'utf8'}
733  try:
734  # if we are already inside a container, then
735  # must map /srv of the nested container to /srv of the parent container:
736  # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
737  if self._alreadyInContainer and self._containerSetup is not None:
738  msg.info("chdir /srv to launch a nested container for the substep")
739  os.chdir("/srv")
740  p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
741  # go back to the original working directory immediately after to store prmon in it
742  if self._alreadyInContainer and self._containerSetup is not None:
743  msg.info("chdir {} after launching the nested container".format(self._workdir))
744  os.chdir(self._workdir)
745 
746  if self._memMonitor:
747  try:
748  self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
749  self._memFullFile = 'prmon.full.' + self._name
750  memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
751  '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
752  '--interval', '30']
753  mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
754  # TODO - link mem.full.current to mem.full.SUBSTEP
755  except Exception as e:
756  msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
757  self._memMonitor = False
758 
759  while p.poll() is None:
760  line = p.stdout.readline()
761  if line:
762  self._echologger.info(line.rstrip())
763  # Hoover up remaining buffered output lines
764  for line in p.stdout:
765  self._echologger.info(line.rstrip())
766 
767  self._rc = p.returncode
768  msg.info('%s executor returns %d', self._name, self._rc)
769  self._exeStop = os.times()
770  msg.debug('exeStop time is {0}'.format(self._exeStop))
771  except OSError as e:
772  errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
773  msg.error(errMsg)
774  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
775  finally:
776  if self._memMonitor:
777  try:
778  mem_proc.send_signal(signal.SIGUSR1)
779  countWait = 0
780  while (not mem_proc.poll()) and countWait < 10:
781  time.sleep(0.1)
782  countWait += 1
783  except OSError:
784  pass
785 
786 
787  def postExecute(self):
788  if hasattr(self._exeLogFile, 'close'):
789  self._exeLogFile.close()
790  if self._memMonitor:
791  try:
792  memFile = open(self._memSummaryFile)
793  self._memStats = json.load(memFile)
794  except Exception as e:
795  msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e))
796  self._memMonitor = False
797  self._memStats = {}
798 
799 
800  def validate(self):
801  if self._valStart is None:
802  self._valStart = os.times()
803  msg.debug('valStart time is {0}'.format(self._valStart))
804  self._hasValidated = True
805 
806 
807  if self._rc == 0:
808  msg.info('Executor {0} validated successfully (return code {1})'.format(self._name, self._rc))
809  self._isValidated = True
810  self._errMsg = ''
811  else:
812  # Want to learn as much as possible from the non-zero code
813  # this is a bit hard in general, although one can do signals.
814  # Probably need to be more specific per exe, i.e., athena non-zero codes
815  self._isValidated = False
816  if self._rc < 0:
817  # Map return codes to what the shell gives (128 + SIGNUM)
818  self._rc = 128 - self._rc
819  if trfExit.codeToSignalname(self._rc) != "":
820  self._errMsg = '{0} got a {1} signal (exit code {2})'.format(self._name, trfExit.codeToSignalname(self._rc), self._rc)
821  else:
822  self._errMsg = 'Non-zero return code from %s (%d)' % (self._name, self._rc)
823  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_FAIL'), self._errMsg)
824 
825 
827  if 'checkEventCount' in self.conf.argdict and self.conf.argdict['checkEventCount'].returnMyValue(exe=self) is False:
828  msg.info('Event counting for substep {0} is skipped'.format(self.name))
829  else:
830  checkcount=trfValidation.eventMatch(self)
831  checkcount.decide()
832  self._eventCount = checkcount.eventCount
833  msg.info('Event counting for substep {0} passed'.format(self.name))
834 
835  self._valStop = os.times()
836  msg.debug('valStop time is {0}'.format(self._valStop))
837 
838 
839 
841  _exitMessageLimit = 200 # Maximum error message length to report in the exitMsg
842  _defaultIgnorePatternFile = ['atlas_error_mask.db']
843 
844 
881  def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
882  inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'],
883  substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {},
884  literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None,
885  manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
886 
887  self._substep = forceToAlphaNum(substep)
888  self._inputEventTest = inputEventTest
889  self._tryDropAndReload = tryDropAndReload
890  self._extraRunargs = extraRunargs
891  self._runtimeRunargs = runtimeRunargs
892  self._literalRunargs = literalRunargs
893  self._dataArgs = dataArgs
894  self._errorMaskFiles = errorMaskFiles
895  self._inputDataTypeCountCheck = inputDataTypeCountCheck
896  self._disableMT = disableMT
897  self._disableMP = disableMP
898  self._onlyMP = onlyMP
899  self._onlyMT = onlyMT
900  self._onlyMPWithRunargs = onlyMPWithRunargs
901  self._skeletonCA=skeletonCA
902 
903  if perfMonFile:
904  self._perfMonFile = None
905  msg.debug("Resource monitoring from PerfMon is now deprecated")
906 
907  # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
908  if isinstance(skeletonFile, str):
909  self._skeleton = [skeletonFile]
910  else:
911  self._skeleton = skeletonFile
912 
913  super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
914  exeArgs=exeArgs, memMonitor=memMonitor)
915 
916  # Add athena specific metadata
917  self._extraMetadata.update({'substep': substep})
918 
919  # Setup JO templates
920  if self._skeleton or self._skeletonCA:
921  self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
922  else:
923  self._jobOptionsTemplate = None
924 
925  @property
927  return self._inputDataTypeCountCheck
928 
929  @inputDataTypeCountCheck.setter
930  def inputDataTypeCountCheck(self, value):
931  self._inputDataTypeCountCheck = value
932 
933  @property
934  def substep(self):
935  return self._substep
936 
937  @property
938  def disableMP(self):
939  return self._disableMP
940 
941  @disableMP.setter
942  def disableMP(self, value):
943  self._disableMP = value
944 
945  @property
946  def disableMT(self):
947  return self._disableMT
948 
949  @disableMT.setter
950  def disableMT(self, value):
951  self._disableMT = value
952 
953  @property
954  def onlyMP(self):
955  return self._onlyMP
956 
957  @onlyMP.setter
958  def onlyMP(self, value):
959  self._onlyMP = value
960 
961  @property
962  def onlyMT(self):
963  return self._onlyMT
964 
965  @onlyMT.setter
966  def onlyMT(self, value):
967  self._onlyMT = value
968 
969  def preExecute(self, input = set(), output = set()):
970  self.setPreExeStart()
971  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
972 
973  # Check we actually have events to process!
974  inputEvents = 0
975  dt = ""
976  if self._inputDataTypeCountCheck is None:
977  self._inputDataTypeCountCheck = input
978  for dataType in self._inputDataTypeCountCheck:
979  if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
980  continue
981 
982  thisInputEvents = self.conf.dataDictionary[dataType].nentries
983  if thisInputEvents > inputEvents:
984  inputEvents = thisInputEvents
985  dt = dataType
986 
987  # Now take into account skipEvents and maxEvents
988  if ('skipEvents' in self.conf.argdict and
989  self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
990  mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
991  else:
992  mySkipEvents = 0
993 
994  if ('maxEvents' in self.conf.argdict and
995  self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
996  myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
997  else:
998  myMaxEvents = -1
999 
1000  # Any events to process...?
1001  if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1002  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1003  'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1004 
1005  try:
1006  # Expected events to process
1007  if (myMaxEvents != -1):
1008  if (self.inData and next(iter(self.inData)) == 'inNULL'):
1009  expectedEvents = myMaxEvents
1010  else:
1011  expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1012  else:
1013  expectedEvents = inputEvents-mySkipEvents
1014  except TypeError:
1015  # catching type error from UNDEFINED inputEvents count
1016  msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1017  expectedEvents = 0
1018 
1019 
1022  OSSetupString = None
1023 
1024  # Extract the asetup string
1025  asetupString = None
1026  legacyThreadingRelease = False
1027  if 'asetup' in self.conf.argdict:
1028  asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1029  legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1030  else:
1031  msg.info('Asetup report: {0}'.format(asetupReport()))
1032 
1033  if asetupString is not None:
1034  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1035  currentOS = os.environ['ALRB_USER_PLATFORM']
1036  if legacyOSRelease and "centos7" not in currentOS:
1037  OSSetupString = "centos7"
1038  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1039 
1040 
1041  # allow overriding the container OS using a flag
1042  if 'runInContainer' in self.conf.argdict:
1043  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1044  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1045  if OSSetupString is not None and asetupString is None:
1046  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1047  '--asetup must be used for the substep which requires --runInContainer')
1048 
1049  # Conditional MP based on runtime arguments
1050  if self._onlyMPWithRunargs:
1051  for k in self._onlyMPWithRunargs:
1052  if k in self.conf._argdict:
1053  self._onlyMP = True
1054 
1055  # Check the consistency of parallel configuration: CLI flags + evnironment.
1056  if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1057  ('ATHENA_CORE_NUMBER' not in os.environ)):
1058  # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1059  msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1060  else:
1061  # Try to detect AthenaMT mode, number of threads and number of concurrent events
1062  if not self._disableMT:
1063  self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1064 
1065  # Try to detect AthenaMP mode and number of workers
1066  if not self._disableMP:
1067  self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1068 
1069  # Check that we actually support MT
1070  if self._onlyMP and self._athenaMT > 0:
1071  msg.info("This configuration does not support MT, falling back to MP")
1072  if self._athenaMP == 0:
1073  self._athenaMP = self._athenaMT
1074  self._athenaMT = 0
1075  self._athenaConcurrentEvents = 0
1076 
1077  # Check that we actually support MP
1078  if self._onlyMT and self._athenaMP > 0:
1079  msg.info("This configuration does not support MP, using MT")
1080  if self._athenaMT == 0:
1081  self._athenaMT = self._athenaMP
1083  self._athenaMP = 0
1084 
1085  # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1086  # which also avoids issues with zero sized files
1087  if not self._disableMP and expectedEvents < self._athenaMP:
1088  msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1089  self._disableMP = True
1090  self._athenaMP = 0
1091 
1092  # Handle executor steps
1093  if self.conf.totalExecutorSteps > 1:
1094  for dataType in output:
1095  if self.conf._dataDictionary[dataType].originalName:
1096  self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1097  else:
1098  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1099  self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1100  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1101 
1102  # And if this is (still) athenaMP, then set some options for workers and output file report
1103  if self._athenaMP > 0:
1104  self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1105  self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1106  self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1107  if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1109  else:
1110  self._athenaMPReadEventOrders = False
1111  # Decide on scheduling
1112  if ('athenaMPStrategy' in self.conf.argdict and
1113  (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1114  self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1115  else:
1116  self._athenaMPStrategy = 'SharedQueue'
1117 
1118  # See if we have options for the target output file size
1119  if 'athenaMPMergeTargetSize' in self.conf.argdict:
1120  for dataType in output:
1121  if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1122  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1123  msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1124  else:
1125  # Use a globbing strategy
1126  matchedViaGlob = False
1127  for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1128  if fnmatch(dataType, mtsType):
1129  self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1130  msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1131  matchedViaGlob = True
1132  break
1133  if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1134  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1135  msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1136 
1137  # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1138  # so that the mother process output file (if it exists) can be used directly
1139  # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1140  for dataType in output:
1141  if self.conf.totalExecutorSteps <= 1:
1142  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1143  if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1144  if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1145  msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1146  else:
1147  self.conf._dataDictionary[dataType].value[0] += "_000"
1148  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1149  else:
1150  self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1151 
1152 
1153 
1154  if self._skeleton or self._skeletonCA:
1155  inputFiles = dict()
1156  for dataType in input:
1157  inputFiles[dataType] = self.conf.dataDictionary[dataType]
1158  outputFiles = dict()
1159  for dataType in output:
1160  outputFiles[dataType] = self.conf.dataDictionary[dataType]
1161 
1162  # See if we have any 'extra' file arguments
1163  nameForFiles = commonExecutorStepName(self._name)
1164  for dataType, dataArg in self.conf.dataDictionary.items():
1165  if isinstance(dataArg, list) and dataArg:
1166  if self.conf.totalExecutorSteps <= 1:
1167  raise ValueError('Multiple input arguments provided but only running one substep')
1168  if self.conf.totalExecutorSteps != len(dataArg):
1169  raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1170 
1171  if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1172  inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1173  else:
1174  if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1175  inputFiles[dataArg.subtype] = dataArg
1176 
1177  msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1178 
1179  # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1180  self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1181  output = outputFiles)
1182 
1183 
1185  if len(input) > 0:
1186  self._extraMetadata['inputs'] = list(input)
1187  if len(output) > 0:
1188  self._extraMetadata['outputs'] = list(output)
1189 
1190 
1191  dbrelease = dbsetup = None
1192  if 'DBRelease' in self.conf.argdict:
1193  dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1194  if path.islink(dbrelease):
1195  dbrelease = path.realpath(dbrelease)
1196  if dbrelease:
1197  # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1198  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1199  if dbdMatch:
1200  msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1201  if not os.access(dbrelease, os.R_OK):
1202  msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1203  msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1204  dbrelease = dbdMatch.group(1)
1205  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1206  else:
1207  # Check if the DBRelease is setup
1208  msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1209  unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1210  if unpacked:
1211  # Now run the setup.py script to customise the paths to the current location...
1212  setupDBRelease(dbsetup)
1213  # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1214  else:
1215  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1216 
1217 
1219  self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1220  self._prepAthenaCommandLine()
1221 
1222 
1223  super(athenaExecutor, self).preExecute(input, output)
1224 
1225  # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1226  # This will have asetup and/or DB release setups in it
1227  # Do this last in this preExecute as the _cmd needs to be finalised
1228  msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1229  self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1230  msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1231 
1232 
1233  def postExecute(self):
1234  super(athenaExecutor, self).postExecute()
1235 
1236  # Handle executor substeps
1237  if self.conf.totalExecutorSteps > 1:
1238  if self._athenaMP > 0:
1239  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1240  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1241  if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1242  # first loop over datasets for the output
1243  for dataType in self._output:
1244  newValue = []
1245  if self._athenaMP > 0:
1246  # assume the same number of workers all the time
1247  for i in range(self.conf.totalExecutorSteps):
1248  for v in self.conf.dataDictionary[dataType].value:
1249  newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1250  '_{0}{1}_'.format(executorStepSuffix, i)))
1251  else:
1252  self.conf.dataDictionary[dataType].multipleOK = True
1253  # just combine all executors
1254  for i in range(self.conf.totalExecutorSteps):
1255  newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1256  self.conf.dataDictionary[dataType].value = newValue
1257 
1258  # do the merging if needed
1259  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1260  self._smartMerge(self.conf.dataDictionary[dataType])
1261 
1262  # If this was an athenaMP run then we need to update output files
1263  elif self._athenaMP > 0:
1264  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1265 
1266  skipFileChecks=False
1267  if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1268  skipFileChecks=True
1269  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1270  for dataType in self._output:
1271  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1272  self._smartMerge(self.conf.dataDictionary[dataType])
1273 
1274  if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1275  self._targzipJiveXML()
1276 
1277  # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1278  # This is a bit ugly to have such a specific feature here though
1279  # TODO
1280  # The best is to have a general approach so that user can extract useful info from log
1281  # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1282  # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1283  # and use it to extract required info and do the summation during log scan.
1284  if self._logFileName=='log.ReSim' and self.name=='ReSim':
1285  msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1287 
1288  # Remove intermediate input/output files of sub-steps
1289  # Delete only files with io="temporay" which are files with pattern "tmp*"
1290  # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1291  # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1292  # Enable if --deleteIntermediateOutputfiles is set
1293  if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1294  inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1295 
1296  for k, v in inputDataDictionary.items():
1297  if not v.io == 'temporary':
1298  continue
1299  for filename in v.value:
1300  if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1301  msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1302  # Check if symbolic link and delete also linked file
1303  if (os.path.realpath(filename) != filename):
1304  targetpath = os.path.realpath(filename)
1305  os.unlink(filename)
1306  if (targetpath) and os.access(targetpath, os.R_OK):
1307  os.unlink(targetpath)
1308 
1309 
1310  def validate(self):
1311  self.setValStart()
1312  self._hasValidated = True
1313  deferredException = None
1314  memLeakThreshold = 5000
1315  _hasMemLeak = False
1316 
1317 
1318  try:
1319  super(athenaExecutor, self).validate()
1321  # In this case we hold this exception until the logfile has been scanned
1322  msg.error('Validation of return code failed: {0!s}'.format(e))
1323  deferredException = e
1324 
1325 
1332  if self._memFullFile:
1333  msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1334  self._memLeakResult = analytic().getFittedData(self._memFullFile)
1335  if self._memLeakResult:
1336  if self._memLeakResult['slope'] > memLeakThreshold:
1337  _hasMemLeak = True
1338  msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1339  else:
1340  msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1341  else:
1342  msg.info('No memory monitor file to be analysed')
1343 
1344  # Logfile scan setup
1345  # Always use ignorePatterns from the command line
1346  # For patterns in files, pefer the command line first, then any special settings for
1347  # this executor, then fallback to the standard default (atlas_error_mask.db)
1348  if 'ignorePatterns' in self.conf.argdict:
1349  igPat = self.conf.argdict['ignorePatterns'].value
1350  else:
1351  igPat = []
1352  if 'ignoreFiles' in self.conf.argdict:
1353  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1354  elif self._errorMaskFiles is not None:
1355  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1356  else:
1357  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1358 
1359  # Now actually scan my logfile
1360  msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1361  self._logScan = trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
1362  ignoreList=ignorePatterns)
1363  worstError = self._logScan.worstError()
1365 
1366 
1367  # In general we add the error message to the exit message, but if it's too long then don't do
1368  # that and just say look in the jobReport
1369  if worstError['firstError']:
1370  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1371  if 'CoreDumpSvc' in worstError['firstError']['message']:
1372  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1373  elif 'G4Exception' in worstError['firstError']['message']:
1374  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1375  else:
1376  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1377  else:
1378  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1379  else:
1380  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1381 
1382  # If we failed on the rc, then abort now
1383  if deferredException is not None:
1384  # Add any logfile information we have
1385  if worstError['nLevel'] >= stdLogLevels['ERROR']:
1386  deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1387  # Add the result of memory analysis
1388  if _hasMemLeak:
1389  deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1390  raise deferredException
1391 
1392 
1393  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1394  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1395  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1396  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1397  self._isValidated = False
1398  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1399  # Add the result of memory analysis
1400  if _hasMemLeak:
1401  exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1402  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1403  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1404 
1405  # Must be ok if we got here!
1406  msg.info('Executor {0} has validated successfully'.format(self.name))
1407  self._isValidated = True
1408 
1409  self._valStop = os.times()
1410  msg.debug('valStop time is {0}'.format(self._valStop))
1411 
1412 
1413  def _isCAEnabled(self):
1414  # CA not present
1415  if 'CA' not in self.conf.argdict:
1416  # If there is no legacy skeleton, then we are running with CA
1417  if not self._skeleton:
1418  return True
1419  else:
1420  return False
1421 
1422  # CA present but None, all substeps running with CA
1423  if self.conf.argdict['CA'] is None:
1424  return True
1425 
1426  # CA enabled for a substep, running with CA
1427  if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1428  return True
1429 
1430  return False
1431 
1432 
1434 
1437  if 'athena' in self.conf.argdict:
1438  self._exe = self.conf.argdict['athena'].value
1439  self._cmd = [self._exe]
1440 
1441  # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1442  currentSubstep = None
1443  if 'athenaopts' in self.conf.argdict:
1444  currentName = commonExecutorStepName(self.name)
1445  if currentName in self.conf.argdict['athenaopts'].value:
1446  currentSubstep = currentName
1447  if self.substep in self.conf.argdict['athenaopts'].value:
1448  msg.info('Athenaopts found for {0} and {1}, joining options. '
1449  'Consider changing your configuration to use just the name or the alias of the substep.'
1450  .format(currentSubstep, self.substep))
1451  self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1452  del self.conf.argdict['athenaopts'].value[self.substep]
1453  msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1454  elif self.substep in self.conf.argdict['athenaopts'].value:
1455  currentSubstep = self.substep
1456  elif 'all' in self.conf.argdict['athenaopts'].value:
1457  currentSubstep = 'all'
1458 
1459  # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1460  preLoadUpdated = dict()
1461  if 'LD_PRELOAD' in self._envUpdate._envdict:
1462  preLoadUpdated[currentSubstep] = False
1463  if 'athenaopts' in self.conf.argdict:
1464  if currentSubstep is not None:
1465  for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1466  # This code is pretty ugly as the athenaopts argument contains
1467  # strings which are really key/value pairs
1468  if athArg.startswith('--preloadlib'):
1469  try:
1470  i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1471  v = athArg.split('=', 1)[1]
1472  msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1473  newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1474  self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1475  except Exception as e:
1476  msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1477  preLoadUpdated[currentSubstep] = True
1478  break
1479  if not preLoadUpdated[currentSubstep]:
1480  msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1481  if 'athenaopts' in self.conf.argdict:
1482  if currentSubstep is not None:
1483  self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1484  else:
1485  self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1486  else:
1487  self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1488 
1489  # Now update command line with the options we have (including any changes to preload)
1490  if 'athenaopts' in self.conf.argdict:
1491  if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1492  self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1493  elif currentSubstep in self.conf.argdict['athenaopts'].value:
1494  self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1495 
1496  if currentSubstep is None:
1497  currentSubstep = 'all'
1498 
1499  if self._tryDropAndReload:
1500  if self._isCAEnabled():
1501  msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1502  elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1503  msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1504  elif 'athenaopts' in self.conf.argdict:
1505  athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1506  # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1507  if currentSubstep in self.conf.argdict['athenaopts'].value:
1508  conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1509  if len(conflictOpts) > 0:
1510  msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1511  else:
1512  msg.info('Appending "--drop-and-reload" to athena options')
1513  self._cmd.append('--drop-and-reload')
1514  else:
1515  msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1516  self._cmd.append('--drop-and-reload')
1517  else:
1518  # This is the 'standard' case - so drop and reload should be ok
1519  msg.info('Appending "--drop-and-reload" to athena options')
1520  self._cmd.append('--drop-and-reload')
1521  else:
1522  msg.info('Skipping test for "--drop-and-reload" in this executor')
1523 
1524  if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1525  # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1526  if self._athenaMT > 0 and not self._disableMT:
1527  if not ('athenaopts' in self.conf.argdict and
1528  any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1529  self._cmd.append('--threads=%s' % str(self._athenaMT))
1530 
1531  # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1532  if self._athenaMP > 0 and not self._disableMP:
1533  if not ('athenaopts' in self.conf.argdict and
1534  any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1535  self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1536 
1537  # Add topoptions
1538  if self._skeleton or self._skeletonCA:
1539  self._cmd += self._topOptionsFiles
1540  msg.info('Updated script arguments with topoptions: %s', self._cmd)
1541 
1542 
1543 
1545  self,
1546  asetup = None,
1547  dbsetup = None,
1548  ossetup = None
1549  ):
1550  self._originalCmd = self._cmd
1551  self._asetup = asetup
1552  self._dbsetup = dbsetup
1553  self._containerSetup = ossetup
1554  self._workdir = os.getcwd()
1555  self._alreadyInContainer = self._workdir.startswith("/srv")
1556  self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1557  self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1558 
1559  # Create a setupATLAS script
1560  setupATLAS = 'my_setupATLAS.sh'
1561  with open(setupATLAS, 'w') as f:
1562  print("#!/bin/bash", file=f)
1563  print("""
1564 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1565  export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1566 fi
1567 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1568  , file=f)
1569  os.chmod(setupATLAS, 0o755)
1570 
1571  msg.debug(
1572  'Preparing wrapper file {wrapperFileName} with '
1573  'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1574  wrapperFileName = self._wrapperFile,
1575  asetupStatus = self._asetup,
1576  dbsetupStatus = self._dbsetup
1577  )
1578  )
1579 
1580  container_cmd = None
1581  try:
1582  with open(self._wrapperFile, 'w') as wrapper:
1583  print('#!/bin/sh', file=wrapper)
1584  if self._containerSetup is not None:
1585  container_cmd = [ os.path.abspath(setupATLAS),
1586  "-c",
1587  self._containerSetup,
1588  "--pwd",
1589  self._workdir,
1590  "-s",
1591  os.path.join('.', self._setupFile),
1592  "-r"]
1593  print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1594  print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1595  print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1596  file = wrapper)
1597  print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1598  file=wrapper)
1599  print('echo', file=wrapper)
1600 
1601  if asetup:
1602  wfile = wrapper
1603  asetupFile = None
1604  # if the substep is executed within a container, setup athena with a separate script
1605  # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1606  if self._containerSetup is not None:
1607  asetupFile = open(self._setupFile, 'w')
1608  wfile = asetupFile
1609  print(f'source ./{setupATLAS} -q', file=wfile)
1610  print(f'asetup {asetup}', file=wfile)
1611  print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1612  if dbsetup:
1613  dbroot = path.dirname(dbsetup)
1614  dbversion = path.basename(dbroot)
1615  print("# DBRelease setup", file=wrapper)
1616  print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1617  print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1618  print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1619  print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1620  print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1621  print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1622  if self._disableMT:
1623  print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1624  if self._disableMP:
1625  print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1626  if self._envUpdate.len > 0:
1627  for envSetting in self._envUpdate.values:
1628  if not envSetting.startswith('LD_PRELOAD'):
1629  print("export", envSetting, file=wrapper)
1630  # If Valgrind is engaged, a serialised Athena configuration file
1631  # is generated for use with a subsequent run of Athena with
1632  # Valgrind.
1633  if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1634  msg.info('Valgrind engaged')
1635  # Define the file name of the serialised Athena
1636  # configuration.
1637  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1638  name = self._name
1639  )
1640  # Run Athena for generation of its serialised configuration.
1641  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1642  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1643  # Generate a Valgrind command, suppressing or ussing default
1644  # options as requested and extra options as requested.
1645  if 'valgrindDefaultOpts' in self.conf._argdict:
1646  defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1647  else:
1648  defaultOptions = True
1649  if 'valgrindExtraOpts' in self.conf._argdict:
1650  extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1651  else:
1652  extraOptionsList = None
1653  msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1654  msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1655  command = ValgrindCommand(
1656  defaultOptions = defaultOptions,
1657  extraOptionsList = extraOptionsList,
1658  AthenaSerialisedConfigurationFile = \
1659  AthenaSerialisedConfigurationFile
1660  )
1661  msg.debug("Valgrind command: {command}".format(command = command))
1662  print(command, file=wrapper)
1663  # If VTune is engaged, a serialised Athena configuration file
1664  # is generated for use with a subsequent run of Athena with
1665  # VTune.
1666  elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1667  msg.info('VTune engaged')
1668  # Define the file name of the serialised Athena
1669  # configuration.
1670  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1671  name = self._name
1672  )
1673  # Run Athena for generation of its serialised configuration.
1674  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1675  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1676  # Generate a VTune command, suppressing or ussing default
1677  # options as requested and extra options as requested.
1678  if 'vtuneDefaultOpts' in self.conf._argdict:
1679  defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1680  else:
1681  defaultOptions = True
1682  if 'vtuneExtraOpts' in self.conf._argdict:
1683  extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1684  else:
1685  extraOptionsList = None
1686  msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1687  msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1688  command = VTuneCommand(
1689  defaultOptions = defaultOptions,
1690  extraOptionsList = extraOptionsList,
1691  AthenaSerialisedConfigurationFile = \
1692  AthenaSerialisedConfigurationFile
1693  )
1694  msg.debug("VTune command: {command}".format(command = command))
1695  print(command, file=wrapper)
1696  else:
1697  msg.info('Valgrind/VTune not engaged')
1698  # run Athena command
1699  print(' '.join(self._cmd), file=wrapper)
1700  os.chmod(self._wrapperFile, 0o755)
1701  except OSError as e:
1702  errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1703  fileName = self._wrapperFile,
1704  error = e
1705  )
1706  msg.error(errMsg)
1708  trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1709  errMsg
1710  )
1711  self._cmd = [ path.join('.', self._wrapperFile) ]
1712  if self._containerSetup is not None:
1713  asetupFile.close()
1714  self._cmd = container_cmd + self._cmd
1715 
1716 
1717 
1719  def _smartMerge(self, fileArg):
1720 
1721  if 'selfMerge' not in dir(fileArg):
1722  msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1723  return
1724 
1725  if fileArg.mergeTargetSize == 0:
1726  msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1727  return
1728 
1729 
1730  mergeCandidates = [list()]
1731  currentMergeSize = 0
1732  for fname in fileArg.value:
1733  size = fileArg.getSingleMetadata(fname, 'file_size')
1734  if not isinstance(size, int):
1735  msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1736  return
1737  # if there is no file in the job, then we must add it
1738  if len(mergeCandidates[-1]) == 0:
1739  msg.debug('Adding file {0} to current empty merge list'.format(fname))
1740  mergeCandidates[-1].append(fname)
1741  currentMergeSize += size
1742  continue
1743  # see if adding this file gets us closer to the target size (but always add if target size is negative)
1744  if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1745  msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1746  mergeCandidates[-1].append(fname)
1747  currentMergeSize += size
1748  continue
1749  # close this merge list and start a new one
1750  msg.debug('Starting a new merge list with file {0}'.format(fname))
1751  mergeCandidates.append([fname])
1752  currentMergeSize = size
1753 
1754  msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1755 
1756  if len(mergeCandidates) == 1:
1757  # Merging to a single file, so use the original filename that the transform
1758  # was started with
1759  mergeNames = [fileArg.originalName]
1760  else:
1761  # Multiple merge targets, so we need a set of unique names
1762  counter = 0
1763  mergeNames = []
1764  for mergeGroup in mergeCandidates:
1765  # Note that the individual worker files get numbered with 3 digit padding,
1766  # so these non-padded merges should be fine
1767  mergeName = fileArg.originalName + '_{0}'.format(counter)
1768  while path.exists(mergeName):
1769  counter += 1
1770  mergeName = fileArg.originalName + '_{0}'.format(counter)
1771  mergeNames.append(mergeName)
1772  counter += 1
1773  # Now actually do the merges
1774  for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1775  msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1776  if len(mergeGroup) <= 1:
1777  msg.info('Skip merging for single file')
1778  else:
1779 
1780  self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1781 
1782 
1783  def _targzipJiveXML(self):
1784  #tgzipping JiveXML files
1785  targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1786  if os.path.exists(targetTGZName):
1787  os.remove(targetTGZName)
1788 
1789  import tarfile
1790  fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1791 
1792  # force gz compression
1793  tar = tarfile.open(targetTGZName, "w:gz")
1794  for fName in os.listdir('.'):
1795  matches = fNameRE.findall(fName)
1796  if len(matches) > 0:
1797  if fNameRE.findall(fName)[0] == fName:
1798  msg.info('adding %s to %s', fName, targetTGZName)
1799  tar.add(fName)
1800 
1801  tar.close()
1802  msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1803 
1804 
1805 
1807 
1808  # Here we validate, but will suppress any errors
1809  def validate(self):
1810  self.setValStart()
1811  try:
1812  super(optionalAthenaExecutor, self).validate()
1814  # In this case we hold this exception until the logfile has been scanned
1815  msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1816  self._isValidated = False
1817  self._errMsg = e.errMsg
1818  self._rc = e.errCode
1819  self._valStop = os.times()
1820  msg.debug('valStop time is {0}'.format(self._valStop))
1821 
1822 
1824 
1835  def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1836  inData = set(), outData = set(), exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True,
1837  perfMonFile = None, tryDropAndReload = True, extraRunargs = {},
1838  manualDataDictionary = None, memMonitor = True):
1839 
1840  super(POOLMergeExecutor, self).__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1841  inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1842  inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1843  tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1844  manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1845 
1846  def preExecute(self, input = set(), output = set()):
1847  self.setPreExeStart()
1848  super(POOLMergeExecutor, self).preExecute(input=input, output=output)
1849 
1850 
1851  def execute(self):
1852  # First call the parent executor, which will manage the athena execution for us
1853  super(POOLMergeExecutor, self).execute()
1854 
1855 
1856 
1859 
1861  def preExecute(self, input=set(), output=set()):
1862  self.setPreExeStart()
1863  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1864  if 'NTUP_PILEUP' not in output:
1865  # New derivation framework transform uses "formats"
1866  if 'formats' not in self.conf.argdict:
1867  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1868  'No derivation configuration specified')
1869 
1870  if ('DAOD' not in output) and ('D2AOD' not in output):
1871  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1872  'No base name for DAOD output')
1873 
1874  formatList = []
1875  if 'formats' in self.conf.argdict: formatList = self.conf.argdict['formats'].value
1876  for reduction in formatList:
1877  if ('DAOD' in output):
1878  dataType = 'DAOD_' + reduction
1879  if 'augmentations' not in self.conf.argdict:
1880  outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1881  else:
1882  for val in self.conf.argdict['augmentations'].value:
1883  if reduction in val.split(':')[0]:
1884  outputName = 'DAOD_' + val.split(':')[1] + '.' + self.conf.argdict['outputDAODFile'].value[0]
1885  break
1886  else:
1887  outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1888 
1889  if ('D2AOD' in output):
1890  dataType = 'D2AOD_' + reduction
1891  outputName = 'D2AOD_' + reduction + '.' + self.conf.argdict['outputD2AODFile'].value[0]
1892 
1893  msg.info('Adding reduction output type {0}'.format(dataType))
1894  output.add(dataType)
1895  newReduction = trfArgClasses.argPOOLFile(outputName, io='output', runarg=True, type='AOD',
1896  name=reduction)
1897  # References to _trf - can this be removed?
1898  self.conf.dataDictionary[dataType] = newReduction
1899 
1900  # Clean up the stub file from the executor input and the transform's data dictionary
1901  # (we don't remove the actual argFile instance)
1902  if ('DAOD' in output):
1903  output.remove('DAOD')
1904  del self.conf.dataDictionary['DAOD']
1905  del self.conf.argdict['outputDAODFile']
1906  if ('D2AOD' in output):
1907  output.remove('D2AOD')
1908  del self.conf.dataDictionary['D2AOD']
1909  del self.conf.argdict['outputD2AODFile']
1910 
1911  msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1912  msg.info('Input/Output: {0}/{1}'.format(input, output))
1913 
1914  msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1915  msg.info('Input/Output: {0}/{1}'.format(input, output))
1916  super(reductionFrameworkExecutor, self).preExecute(input, output)
1917 
1918 
1919 
1921  def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']),
1922  exe='DQHistogramMerge.py', exeArgs = [], memMonitor = True):
1924  self._histMergeList = 'HISTMergeList.txt'
1925 
1926  super(DQMergeExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1927  exeArgs=exeArgs, memMonitor=memMonitor)
1928 
1929 
1930  def preExecute(self, input = set(), output = set()):
1931  self.setPreExeStart()
1932  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1933 
1934  super(DQMergeExecutor, self).preExecute(input=input, output=output)
1935 
1936  # Write the list of files to be merged
1937  with open(self._histMergeList, 'w') as DQMergeFile:
1938  for dataType in input:
1939  for fname in self.conf.dataDictionary[dataType].value:
1940  self.conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1941  print(fname, file=DQMergeFile)
1942 
1943  self._cmd.append(self._histMergeList)
1944 
1945  # Add the output file
1946  if len(output) != 1:
1947  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1948  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
1949  outDataType = list(output)[0]
1950  self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
1951 
1952  # Set the run_post_processing to True/False
1953  if (self.conf._argdict.get("run_post_processing",False)):
1954  self._cmd.append('True')
1955  else:
1956  self._cmd.append('False')
1957 
1958  if (self.conf._argdict.get("is_incremental_merge",False)):
1959  self._cmd.append('True')
1960  else:
1961  self._cmd.append('False')
1962 
1963  for k in ("excludeHist","excludeDir"):
1964  if k in self.conf._argdict:
1965  self._cmd.append("--{0}={1}".format(k,self.conf._argdict[k]))
1966 
1967 
1968  def validate(self):
1969  self.setValStart()
1970  super(DQMergeExecutor, self).validate()
1971 
1972  exitErrorMessage = ''
1973  # Base class validation successful, Now scan the logfile for missed errors.
1974  try:
1976  worstError = logScan.worstError()
1977 
1978  # In general we add the error message to the exit message, but if it's too long then don't do
1979  # that and just say look in the jobReport
1980  if worstError['firstError']:
1981  if len(worstError['firstError']['message']) > logScan._msgLimit:
1982  exitErrorMessage = "Long {0} message at line {1}" \
1983  " (see jobReport for further details)".format(worstError['level'],
1984  worstError['firstError']['firstLine'])
1985  else:
1986  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
1987  worstError['firstError']['message'])
1988  except OSError as e:
1989  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
1991  'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
1992 
1993  if worstError['nLevel'] == stdLogLevels['ERROR'] and (
1994  'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1995  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1996 
1997  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1998  self._isValidated = False
1999  msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2000  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2001  raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2002 
2003  # Must be ok if we got here!
2004  msg.info('Executor {0} has validated successfully'.format(self.name))
2005  self._isValidated = True
2006 
2007  self._valStop = os.times()
2008  msg.debug('valStop time is {0}'.format(self._valStop))
2009 
2010 
2012  def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']),
2013  exe='DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor = True):
2015  self._histMergeList = 'HISTMergeList.txt'
2016 
2017  super(DQMPostProcessExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2018  exeArgs=exeArgs, memMonitor=memMonitor)
2019 
2020 
2021  def preExecute(self, input = set(), output = set()):
2022  self.setPreExeStart()
2023  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2024 
2025  super(DQMPostProcessExecutor, self).preExecute(input=input, output=output)
2026 
2027  #build input file list (typically only one):
2028  dsName=self.conf.argdict["inputHISTFile"].dataset
2029  inputList=[]
2030  for dataType in input:
2031  for fname in self.conf.dataDictionary[dataType].value:
2032  #if no dataset name is give, guess it from file name
2033  if not dsName: dsName=".".join(fname.split('.')[0:4])
2034  inputList.append("#".join([dsName,fname]))
2035 
2036 
2037  if len(output) != 1:
2038  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2039  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2040  outDataType = list(output)[0]
2041  #build argument json:
2042  #T0 uses keys: 'allowCOOLUpload', 'doWebDisplay', 'incrementalMode', 'inputHistFiles', 'mergeParams', 'outputHistFile', 'postProcessing', 'reverseFileOrder', 'servers'
2043  #more keys: runNumber, streamName,projectTag,filepaths,productionMode,skipMerge
2044  wrapperParams={"inputHistFiles" : inputList,
2045  "outputHistFile" : dsName+"#"+self.conf.dataDictionary[outDataType].value[0],
2046  "incrementalMode": "True" if self.conf._argdict.get("is_incremental_merge",False) else "False",
2047  "postProcessing" : "True" if self.conf._argdict.get("run_post_processing",False) else "False",
2048  "doWebDisplay" : "True" if self.conf._argdict.get("doWebDisplay",False) else "False",
2049  "allowCOOLUpload": "True" if self.conf._argdict.get("allowCOOLUpload",False) else "False",
2050  "mergeParams" : "",
2051 
2052  }
2053  if "servers" in self.conf._argdict:
2054  wrapperParams["server"]=self.conf._argdict["servers"]
2055 
2056  for k in ("excludeHist","excludeDir"):
2057  if k in self.conf._argdict:
2058  wrapperParams["mergeParams"]+=(" --{0}={1}".format(k,self.conf._argdict[k]))
2059 
2060 
2061  with open("args.json", "w") as f:
2062  json.dump(wrapperParams, f)
2063 
2064  self._cmd.append("--argJSON=args.json")
2065 
2066 
2067 
2068  def validate(self):
2069  self.setValStart()
2070  super(DQMPostProcessExecutor, self).validate()
2071 
2072  exitErrorMessage = ''
2073  # Base class validation successful, Now scan the logfile for missed errors.
2074  try:
2076  worstError = logScan.worstError()
2077 
2078  # In general we add the error message to the exit message, but if it's too long then don't do
2079  # that and just say look in the jobReport
2080  if worstError['firstError']:
2081  if len(worstError['firstError']['message']) > logScan._msgLimit:
2082  exitErrorMessage = "Long {0} message at line {1}" \
2083  " (see jobReport for further details)".format(worstError['level'],
2084  worstError['firstError']['firstLine'])
2085  else:
2086  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2087  worstError['firstError']['message'])
2088  except OSError as e:
2089  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2091  'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2092 
2093  if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2094  'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2095  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2096 
2097  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2098  self._isValidated = False
2099  msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2100  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2101  raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2102 
2103  # Must be ok if we got here!
2104  msg.info('Executor {0} has validated successfully'.format(self.name))
2105  self._isValidated = True
2106 
2107  self._valStop = os.times()
2108  msg.debug('valStop time is {0}'.format(self._valStop))
2109 
2111 
2112  def preExecute(self, input = set(), output = set()):
2113  self.setPreExeStart()
2114  msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2115 
2116  # Basic command, and allow overwrite of the output file
2117  if self._exe is None:
2118  self._exe = 'hadd'
2119  self._cmd = [self._exe, "-f"]
2120 
2121 
2122  # Add the output file
2123  if len(output) != 1:
2124  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2125  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2126  outDataType = list(output)[0]
2127  self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
2128  # Add to be merged to the cmd chain
2129  for dataType in input:
2130  self._cmd.extend(self.conf.dataDictionary[dataType].value)
2131 
2132  super(NTUPMergeExecutor, self).preExecute(input=input, output=output)
2133 
2134 
2135 
2137 
2138  def preExecute(self, input = set(), output = set()):
2139  self.setPreExeStart()
2140  self._inputBS = list(input)[0]
2141  self._outputBS = list(output)[0]
2142  self._maskedFiles = []
2143  self._useStubFile = False
2144  if 'maskEmptyInputs' in self.conf.argdict and self.conf.argdict['maskEmptyInputs'].value is True:
2145  eventfullFiles = []
2146  for fname in self.conf.dataDictionary[self._inputBS].value:
2147  nEvents = self.conf.dataDictionary[self._inputBS].getSingleMetadata(fname, 'nentries')
2148  msg.debug('Found {0} events in file {1}'.format(nEvents, fname))
2149  if isinstance(nEvents, int) and nEvents > 0:
2150  eventfullFiles.append(fname)
2151  self._maskedFiles = list(set(self.conf.dataDictionary[self._inputBS].value) - set(eventfullFiles))
2152  if len(self._maskedFiles) > 0:
2153  msg.info('The following input files are masked because they have 0 events: {0}'.format(' '.join(self._maskedFiles)))
2154  if len(eventfullFiles) == 0:
2155  if 'emptyStubFile' in self.conf.argdict and path.exists(self.conf.argdict['emptyStubFile'].value):
2156  self._useStubFile = True
2157  msg.info("All input files are empty - will use stub file {0} as output".format(self.conf.argdict['emptyStubFile'].value))
2158  else:
2159  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2160  'All input files had zero events - aborting BS merge')
2161 
2162  # Write the list of input files to a text file, so that testMergedFiles can swallow it
2163  self._mergeBSFileList = '{0}.list'.format(self._name)
2164  self._mergeBSLogfile = '{0}.out'.format(self._name)
2165  try:
2166  with open(self._mergeBSFileList, 'w') as BSFileList:
2167  for fname in self.conf.dataDictionary[self._inputBS].value:
2168  if fname not in self._maskedFiles:
2169  print(fname, file=BSFileList)
2170  except OSError as e:
2171  errMsg = 'Got an error when writing list of BS files to {0}: {1}'.format(self._mergeBSFileList, e)
2172  msg.error(errMsg)
2173  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'), errMsg)
2174 
2175  # Hope that we were given a correct filename...
2176  self._outputFilename = self.conf.dataDictionary[self._outputBS].value[0]
2177  if self._outputFilename.endswith('._0001.data'):
2178  self._doRename = False
2179  self._outputFilename = self._outputFilename.split('._0001.data')[0]
2180  elif self.conf.argdict['allowRename'].value is True:
2181  # OK, non-fatal, we go for a renaming
2182  msg.info('Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2183  self._doRename = True
2184  else:
2185  # No rename allowed, so we are dead...
2186  errmsg = 'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2187  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'), errmsg)
2188 
2189  # Set the correct command for execution
2190  self._cmd = [self._exe, self._mergeBSFileList, '0', self._outputFilename]
2191 
2192  super(bsMergeExecutor, self).preExecute(input=input, output=output)
2193 
2194  def execute(self):
2195  if self._useStubFile:
2196  # Need to fake execution!
2197  self._exeStart = os.times()
2198  msg.debug('exeStart time is {0}'.format(self._exeStart))
2199  msg.info("Using stub file for empty BS output - execution is fake")
2200  if self._outputFilename != self.conf.argdict['emptyStubFile'].value:
2201  os.rename(self.conf.argdict['emptyStubFile'].value, self._outputFilename)
2202  self._memMonitor = False
2203  self._hasExecuted = True
2204  self._rc = 0
2205  self._exeStop = os.times()
2206  msg.debug('exeStop time is {0}'.format(self._exeStop))
2207  else:
2208  super(bsMergeExecutor, self).execute()
2209 
2210  def postExecute(self):
2211  if self._useStubFile:
2212  pass
2213  elif self._doRename:
2214  self._expectedOutput = self._outputFilename + '._0001.data'
2215  msg.info('Renaming {0} to {1}'.format(self._expectedOutput, self.conf.dataDictionary[self._outputBS].value[0]))
2216  try:
2217  os.rename(self._outputFilename + '._0001.data', self.conf.dataDictionary[self._outputBS].value[0])
2218  except OSError as e:
2219  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
2220  'Exception raised when renaming {0} to {1}: {2}'.format(self._outputFilename, self.conf.dataDictionary[self._outputBS].value[0], e))
2221  super(bsMergeExecutor, self).postExecute()
2222 
2223 
2224 
2226 
2227  def preExecute(self, input = set(), output = set()):
2228  self.setPreExeStart()
2229  self._memMonitor = False
2230 
2231  #archiving
2232  if self._exe == 'zip':
2233  if 'outputArchFile' not in self.conf.argdict:
2234  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
2235 
2236  self._cmd = ['python']
2237  try:
2238  with open('zip_wrapper.py', 'w') as zip_wrapper:
2239  print("import zipfile, os, shutil", file=zip_wrapper)
2240  if os.path.exists(self.conf.argdict['outputArchFile'].value[0]):
2241  #appending input file(s) to existing archive
2242  print("zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2243  else:
2244  #creating new archive
2245  print("zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2246  print("for f in {}:".format(self.conf.argdict['inputDataFile'].value), file=zip_wrapper)
2247  #This module gives false positives (as of python 3.7.0). Will also check the name for ".zip"
2248  #print >> zip_wrapper, " if zipfile.is_zipfile(f):"
2249  print(" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2250  print(" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2251  print(" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2252  print(" archive.extractall('tmp')", file=zip_wrapper)
2253  print(" archive.close()", file=zip_wrapper)
2254  # remove stuff as soon as it is saved to output in order to save disk space at worker node
2255  print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2256  print(" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2257  print(" os.unlink(f)", file=zip_wrapper)
2258  print(" if os.path.isdir('tmp'):", file=zip_wrapper)
2259  print(" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2260  print(" for name in files:", file=zip_wrapper)
2261  print(" print 'Zipping {}'.format(name)", file=zip_wrapper)
2262  print(" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2263  print(" shutil.rmtree('tmp')", file=zip_wrapper)
2264  print(" else:", file=zip_wrapper)
2265  print(" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2266  print(" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2267  print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2268  print(" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2269  print(" os.unlink(f)", file=zip_wrapper)
2270  print("zf.close()", file=zip_wrapper)
2271  os.chmod('zip_wrapper.py', 0o755)
2272  except OSError as e:
2273  errMsg = 'error writing zip wrapper {fileName}: {error}'.format(fileName = 'zip_wrapper.py',
2274  error = e
2275  )
2276  msg.error(errMsg)
2277  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2278  errMsg
2279  )
2280  self._cmd.append('zip_wrapper.py')
2281 
2282  #unarchiving
2283  elif self._exe == 'unarchive':
2284  import zipfile
2285  for infile in self.conf.argdict['inputArchFile'].value:
2286  if not zipfile.is_zipfile(infile):
2287  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2288  'An input file is not a zip archive - aborting unpacking')
2289  self._cmd = ['python']
2290  try:
2291  with open('unarchive_wrapper.py', 'w') as unarchive_wrapper:
2292  print("import zipfile", file=unarchive_wrapper)
2293  print("for f in {}:".format(self.conf.argdict['inputArchFile'].value), file=unarchive_wrapper)
2294  print(" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2295  print(" path = '{}'".format(self.conf.argdict['path']), file=unarchive_wrapper)
2296  print(" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2297  print(" archive.extractall(path)", file=unarchive_wrapper)
2298  print(" archive.close()", file=unarchive_wrapper)
2299  os.chmod('unarchive_wrapper.py', 0o755)
2300  except OSError as e:
2301  errMsg = 'error writing unarchive wrapper {fileName}: {error}'.format(fileName = 'unarchive_wrapper.py',
2302  error = e
2303  )
2304  msg.error(errMsg)
2305  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2306  errMsg
2307  )
2308  self._cmd.append('unarchive_wrapper.py')
2309  super(archiveExecutor, self).preExecute(input=input, output=output)
2310 
python.trfExe.logscanExecutor._logFileName
_logFileName
Definition: trfExe.py:509
grepfile.info
info
Definition: grepfile.py:38
python.trfExe.transformExecutor._isValidated
_isValidated
Definition: trfExe.py:175
python.trfExe.echoExecutor.execute
def execute(self)
Definition: trfExe.py:579
python.trfExe.athenaExecutor._errorMaskFiles
_errorMaskFiles
Definition: trfExe.py:890
python.trfExe.transformExecutor._hasValidated
_hasValidated
Definition: trfExe.py:174
python.trfExe.athenaExecutor
Definition: trfExe.py:840
python.trfExe.athenaExecutor._runtimeRunargs
_runtimeRunargs
Definition: trfExe.py:887
python.trfExe.executorConfig.addToArgdict
def addToArgdict(self, key, value)
Add a new object to the argdict.
Definition: trfExe.py:117
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
python.trfExe.DQMPostProcessExecutor
Specialist execution class for DQM post-processing of histograms.
Definition: trfExe.py:2011
python.trfExe.athenaExecutor.disableMT
def disableMT(self)
Definition: trfExe.py:946
python.trfExe.executorConfig._argdict
_argdict
Definition: trfExe.py:63
python.trfExe.transformExecutor._athenaConcurrentEvents
_athenaConcurrentEvents
Definition: trfExe.py:194
python.trfExe.scriptExecutor._exeLogFile
_exeLogFile
Definition: trfExe.py:696
python.trfExe.transformExecutor.errMsg
def errMsg(self)
Definition: trfExe.py:303
python.trfExe.athenaExecutor._setupFile
_setupFile
Definition: trfExe.py:1552
python.trfExe.athenaExecutor._prepAthenaCommandLine
def _prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
Definition: trfExe.py:1433
python.trfExe.athenaExecutor._extraRunargs
_extraRunargs
Definition: trfExe.py:886
python.trfExe.transformExecutor.execute
def execute(self)
Definition: trfExe.py:474
python.trfExe.DQMPostProcessExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2021
python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
python.trfExe.DQMergeExecutor.validate
def validate(self)
Definition: trfExe.py:1968
python.trfExe.echoExecutor.__init__
def __init__(self, name='Echo', trf=None)
Definition: trfExe.py:573
python.trfExe.transformExecutor.myMerger
def myMerger(self)
Now define properties for these data members.
Definition: trfExe.py:206
python.trfExe.transformExecutor
Executors always only even execute a single step, as seen by the transform.
Definition: trfExe.py:126
python.trfExe.athenaExecutor._inputDataTypeCountCheck
_inputDataTypeCountCheck
Definition: trfExe.py:891
python.trfExe.DQMergeExecutor.__init__
def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
Definition: trfExe.py:1921
python.trfExe.transformExecutor._memFullFile
_memFullFile
Definition: trfExe.py:190
python.trfExe.NTUPMergeExecutor
Specialist execution class for merging NTUPLE files.
Definition: trfExe.py:2110
python.trfExe.logscanExecutor.validate
def validate(self)
Definition: trfExe.py:517
python.trfExe.executorConfig.setFromTransform
def setFromTransform(self, trf)
Set configuration properties from the parent transform.
Definition: trfExe.py:112
python.trfExe.POOLMergeExecutor.__init__
def __init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
Definition: trfExe.py:1835
python.trfArgClasses.argPOOLFile
POOL file class.
Definition: trfArgClasses.py:1425
python.trfExe.transformExecutor.preExeStartTimes
def preExeStartTimes(self)
Definition: trfExe.py:331
python.trfExceptions.TransformSetupException
Setup exceptions.
Definition: trfExceptions.py:42
python.trfExe.POOLMergeExecutor.execute
def execute(self)
Definition: trfExe.py:1851
python.trfExe.athenaExecutor._skeleton
_skeleton
Definition: trfExe.py:905
python.trfExe.athenaExecutor._skeletonCA
_skeletonCA
Definition: trfExe.py:897
python.trfExe.transformExecutor.cpuTimeTotal
def cpuTimeTotal(self)
Definition: trfExe.py:429
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfExe.transformExecutor.validate
def validate(self)
Definition: trfExe.py:488
python.trfExe.executorConfig.dataDictionary
def dataDictionary(self)
Definition: trfExe.py:78
python.trfExe.transformExecutor._validation
_validation
Definition: trfExe.py:312
python.trfExe.transformExecutor.inData
inData
Definition: trfExe.py:250
python.trfExe.athenaExecutor._athenaMPReadEventOrders
_athenaMPReadEventOrders
Definition: trfExe.py:1108
python.trfExe.DQMergeExecutor
Specialist execution class for merging DQ histograms.
Definition: trfExe.py:1920
python.trfExe.dummyExecutor.execute
def execute(self)
Definition: trfExe.py:601
python.trfExe.logscanExecutor.__init__
def __init__(self, name='Logscan')
Definition: trfExe.py:506
python.trfExe.DQMergeExecutor._histMergeList
_histMergeList
Definition: trfExe.py:1923
python.trfExe.logscanExecutor
Special executor that will enable a logfile scan as part of its validation.
Definition: trfExe.py:505
python.trfExe.transformExecutor._myMerger
_myMerger
Definition: trfExe.py:201
python.trfExe.transformExecutor._rc
_rc
Definition: trfExe.py:170
PyJobTransforms.trfJobOptions
Contains functions related Athena Job Options files.
python.trfExe.transformExecutor._valStop
_valStop
Definition: trfExe.py:187
index
Definition: index.py:1
python.trfExe.DQMPostProcessExecutor.validate
def validate(self)
Definition: trfExe.py:2068
python.trfExe.scriptExecutor.validate
def validate(self)
Definition: trfExe.py:800
python.trfExe.transformExecutor.athenaMP
def athenaMP(self)
Definition: trfExe.py:451
python.trfExe.bsMergeExecutor
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
Definition: trfExe.py:2136
python.trfExe.athenaExecutor._onlyMPWithRunargs
_onlyMPWithRunargs
Definition: trfExe.py:896
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1683
python.trfExe.athenaExecutor._asetup
_asetup
Definition: trfExe.py:1546
python.trfUtils.VTuneCommand
def VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition: trfUtils.py:1642
python.trfExe.athenaExecutor._topOptionsFiles
_topOptionsFiles
Write the skeleton file and prep athena.
Definition: trfExe.py:1180
python.trfExe.transformExecutor._hasExecuted
_hasExecuted
Definition: trfExe.py:169
python.trfExe.transformExecutor._eventCount
_eventCount
Definition: trfExe.py:191
python.trfExe.executorConfig.firstExecutor
def firstExecutor(self)
Definition: trfExe.py:86
python.trfExe.scriptExecutor._echoOutput
_echoOutput
Definition: trfExe.py:631
python.trfExe.transformExecutor.output
def output(self)
Definition: trfExe.py:284
python.trfUtils.ValgrindCommand
def ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition: trfUtils.py:1576
python.trfExe.logscanExecutor._logScan
_logScan
TODO: This is a cut'n'paste from the athenaExecutor We really should factorise this and use it common...
Definition: trfExe.py:536
python.trfExe._encoding_stream
def _encoding_stream(s)
Definition: trfExe.py:44
python.trfExe.transformExecutor.validationWallTime
def validationWallTime(self)
Definition: trfExe.py:422
python.trfExe.POOLMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:1846
python.trfExe.scriptExecutor._echologger
_echologger
Definition: trfExe.py:691
python.trfExe.scriptExecutor._echostream
_echostream
Definition: trfExe.py:701
python.trfMTTools.detectAthenaMTThreads
def detectAthenaMTThreads(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMT has been requested.
Definition: trfMTTools.py:23
python.trfExe.reductionFrameworkExecutor
Specialist executor to manage the handling of multiple implicit input and output files within the der...
Definition: trfExe.py:1858
python.trfExe.bsMergeExecutor._mergeBSLogfile
_mergeBSLogfile
Definition: trfExe.py:2164
python.trfExe.transformExecutor.postExeWallTime
def postExeWallTime(self)
Definition: trfExe.py:408
athena.value
value
Definition: athena.py:124
python.trfExe.transformExecutor.memStats
def memStats(self)
Definition: trfExe.py:393
python.trfExe.reductionFrameworkExecutor.preExecute
def preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
Definition: trfExe.py:1861
python.trfExe.transformExecutor.preExeCpuTime
def preExeCpuTime(self)
Definition: trfExe.py:351
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.trfExe.transformExecutor.__init__
def __init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition: trfExe.py:137
python.trfExe.transformExecutor._outData
_outData
Definition: trfExe.py:144
PyJobTransforms.trfExitCodes
Module for transform exit codes.
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.trfExe.scriptExecutor._exe
_exe
Definition: trfExe.py:621
python.trfEnv.environmentUpdate
Class holding the update to an environment that will be passed on to an executor.
Definition: trfEnv.py:15
python.trfExe.transformExecutor.extraMetadata
def extraMetadata(self)
Definition: trfExe.py:291
intersection
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
Definition: compareFlatTrees.cxx:25
python.trfExe.transformExecutor.setValStart
def setValStart(self)
Definition: trfExe.py:465
python.trfUtils.reportEventsPassedSimFilter
def reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition: trfUtils.py:1716
python.trfExe.transformExecutor._extraMetadata
_extraMetadata
Definition: trfExe.py:180
python.trfExe.bsMergeExecutor._expectedOutput
_expectedOutput
Definition: trfExe.py:2214
python.trfExe.athenaExecutor.validate
def validate(self)
Definition: trfExe.py:1310
python.trfExe.athenaExecutor._originalCmd
_originalCmd
Definition: trfExe.py:1545
python.trfExe.executorConfig.addToDataDictionary
def addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
Definition: trfExe.py:121
python.trfExe.DQMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:1930
python.trfExe.executorConfig.totalExecutorSteps
def totalExecutorSteps(self)
Definition: trfExe.py:102
python.trfExe.archiveExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2227
python.trfExe.transformExecutor.valStartTimes
def valStartTimes(self)
Definition: trfExe.py:343
python.trfExe.transformExecutor.hasExecuted
def hasExecuted(self)
Definition: trfExe.py:295
python.trfExe.athenaExecutor._envUpdate
_envUpdate
Add input/output file information - this can't be done in init as we don't know what our inputs and o...
Definition: trfExe.py:1218
python.trfExe.bsMergeExecutor._doRename
_doRename
Definition: trfExe.py:2178
python.trfExe.transformExecutor._exeStop
_exeStop
Definition: trfExe.py:186
pool::DbPrintLvl::setLevel
void setLevel(MsgLevel l)
Definition: DbPrint.h:32
python.trfExe.dummyExecutor
Definition: trfExe.py:594
python.trfExe.athenaExecutor._wrapperFile
_wrapperFile
Definition: trfExe.py:1551
python.trfExceptions.TransformExecutionException
Base class for execution exceptions.
Definition: trfExceptions.py:62
python.trfExe.transformExecutor.wallTime
def wallTime(self)
Definition: trfExe.py:386
PixelModuleFeMask_create_db.remove
string remove
Definition: PixelModuleFeMask_create_db.py:83
python.trfExe.scriptExecutor.execute
def execute(self)
Definition: trfExe.py:722
python.trfExe.transformExecutor.wallTimeTotal
def wallTimeTotal(self)
Definition: trfExe.py:436
python.trfExe.transformExecutor._valStart
_valStart
Definition: trfExe.py:187
python.trfExe.scriptExecutor.exeArgs
def exeArgs(self)
Definition: trfExe.py:649
python.trfExe.executorConfig._firstExecutor
_firstExecutor
Definition: trfExe.py:65
python.trfExe.transformExecutor.inDataUpdate
def inDataUpdate(self, value)
Definition: trfExe.py:244
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
python.trfExe.executorConfig.__init__
def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Definition: trfExe.py:62
python.trfExe.executorConfig._executorStep
_executorStep
Definition: trfExe.py:66
python.trfExe.athenaExecutor.name
name
Definition: trfExe.py:1284
python.trfExe.DQMPostProcessExecutor.__init__
def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
Definition: trfExe.py:2012
python.trfExe.transformExecutor._containerSetup
_containerSetup
Definition: trfExe.py:198
python.trfExe.transformExecutor.eventCount
def eventCount(self)
Definition: trfExe.py:443
python.trfExe.athenaExecutor._workdir
_workdir
Definition: trfExe.py:1549
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
python.trfExe.scriptExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:657
PyJobTransforms.trfMPTools
Utilities for handling AthenaMP jobs.
python.trfExe.logscanExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:511
python.trfExe.athenaExecutor._smartMerge
def _smartMerge(self, fileArg)
Manage smart merging of output files.
Definition: trfExe.py:1719
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
python.trfValidation.ignorePatterns
Class of patterns that can be ignored from athena logfiles.
Definition: trfValidation.py:100
python.trfExe.logscanExecutor._errorMaskFiles
_errorMaskFiles
Definition: trfExe.py:508
python.trfExe.transformExecutor.outDataUpdate
def outDataUpdate(self, value)
Definition: trfExe.py:264
python.trfExe.transformExecutor.conf
conf
Executor configuration:
Definition: trfExe.py:162
python.trfExe.athenaExecutor._tryDropAndReload
_tryDropAndReload
Definition: trfExe.py:885
python.trfExe.athenaExecutor._writeAthenaWrapper
def _writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
Definition: trfExe.py:1544
python.trfExe.transformExecutor.input
def input(self)
Definition: trfExe.py:275
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:195
python.trfExe.scriptExecutor
Definition: trfExe.py:618
python.trfExe.athenaExecutor._literalRunargs
_literalRunargs
Definition: trfExe.py:888
python.trfExe.executorConfig._dataDictionary
_dataDictionary
Definition: trfExe.py:64
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.trfArgClasses.argSubstepList
Argument class for substep lists, suitable for preExec/postExec.
Definition: trfArgClasses.py:2036
python.trfExe.athenaExecutor._athenaMPStrategy
_athenaMPStrategy
Definition: trfExe.py:1114
python.trfValidation.athenaLogFileReport
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Definition: trfValidation.py:211
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.trfExe.transformExecutor.postExeCpuTime
def postExeCpuTime(self)
Definition: trfExe.py:401
python.trfExe.transformExecutor._athenaMP
_athenaMP
Definition: trfExe.py:192
python.trfExe.executorConfig.executorStep
def executorStep(self)
Definition: trfExe.py:94
python.trfExe.echoExecutor
Definition: trfExe.py:572
python.trfExe.transformExecutor._exeStart
_exeStart
Definition: trfExe.py:186
python.trfExe.athenaExecutor._logScan
_logScan
Definition: trfExe.py:1361
python.trfExe.transformExecutor.exeStopTimes
def exeStopTimes(self)
Definition: trfExe.py:339
python.trfExe.athenaExecutor._disableMP
_disableMP
Definition: trfExe.py:893
python.trfExe.scriptExecutor._memSummaryFile
_memSummaryFile
Definition: trfExe.py:748
python.trfExe.scriptExecutor.__init__
def __init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Definition: trfExe.py:619
python.trfExe.athenaExecutor.disableMP
def disableMP(self)
Definition: trfExe.py:938
python.trfExe.scriptExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:787
PyJobTransforms.trfValidation
Validation control for job transforms.
beamspotman.dir
string dir
Definition: beamspotman.py:623
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.trfExceptions.TransformValidationException
Group of validation exceptions.
Definition: trfExceptions.py:50
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
python.trfExe.transformExecutor.dbMonitor
def dbMonitor(self)
Definition: trfExe.py:455
python.trfExe.transformExecutor.memAnalysis
def memAnalysis(self)
Definition: trfExe.py:397
python.trfExe.scriptExecutor._buildStandardCommand
def _buildStandardCommand(self)
Definition: trfExe.py:705
python.trfExe.executorConfig.argdict
def argdict(self)
Definition: trfExe.py:70
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.trfExe.optionalAthenaExecutor
Athena executor where failure is not consisered fatal.
Definition: trfExe.py:1806
python.trfExe.athenaExecutor._isCAEnabled
def _isCAEnabled(self)
Check if running with CA.
Definition: trfExe.py:1413
python.trfExe.bsMergeExecutor._outputFilename
_outputFilename
Definition: trfExe.py:2176
python.trfExe.executorConfig._totalExecutorSteps
_totalExecutorSteps
Definition: trfExe.py:67
python.trfExe.transformExecutor._athenaMT
_athenaMT
Definition: trfExe.py:193
python.trfExe.athenaExecutor._substep
_substep
Definition: trfExe.py:883
python.trfExe.dummyExecutor.__init__
def __init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition: trfExe.py:595
python.trfExe.transformExecutor.valStopTimes
def valStopTimes(self)
Definition: trfExe.py:347
PyJobTransforms.trfMTTools
Utilities for handling AthenaMT jobs.
python.trfExe.transformExecutor._preExeStart
_preExeStart
Definition: trfExe.py:185
python.trfExe.transformExecutor.trf
def trf(self)
Definition: trfExe.py:224
python.trfExe.archiveExecutor
Archive transform.
Definition: trfExe.py:2225
python.trfExe.athenaExecutor._jobOptionsTemplate
_jobOptionsTemplate
Definition: trfExe.py:917
python.trfExe.athenaExecutor._onlyMP
_onlyMP
Do we need to run asetup first? For a certain substep (e.g.
Definition: trfExe.py:894
python.trfExe.athenaExecutor._athenaMPEventOrdersFile
_athenaMPEventOrdersFile
Definition: trfExe.py:1106
python.trfExe.transformExecutor.sysTime
def sysTime(self)
Definition: trfExe.py:379
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfExe.scriptExecutor._input
_input
Definition: trfExe.py:661
python.trfExe.transformExecutor.cpuTime
def cpuTime(self)
Definition: trfExe.py:365
python.trfMPTools.detectAthenaMPProcs
def detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition: trfMPTools.py:27
python.trfExe.athenaExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:1233
python.trfExe.athenaExecutor.onlyMT
def onlyMT(self)
Definition: trfExe.py:962
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1691
python.trfExe.transformExecutor._alreadyInContainer
_alreadyInContainer
Definition: trfExe.py:197
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
Trk::open
@ open
Definition: BinningType.h:40
python.trfExe.DQMPostProcessExecutor._histMergeList
_histMergeList
Definition: trfExe.py:2014
python.trfExe.transformExecutor.isValidated
def isValidated(self)
Definition: trfExe.py:319
python.trfExe.transformExecutor._errMsg
_errMsg
Definition: trfExe.py:171
python.trfExe.transformExecutor._inData
_inData
Definition: trfExe.py:143
python.trfExe.transformExecutor.setPreExeStart
def setPreExeStart(self)
Definition: trfExe.py:460
ActsTrk::detail::MakeDerivedVariant::extend
constexpr std::variant< Args..., T > extend(const std::variant< Args... > &, const T &)
Definition: MakeDerivedVariant.h:17
python.trfExe.executorConfig
Definition: trfExe.py:56
python.trfExe.athenaExecutor.__init__
def __init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
Definition: trfExe.py:881
PyJobTransforms.trfUtils
Transform utility functions.
python.trfExe.athenaExecutor._targzipJiveXML
def _targzipJiveXML(self)
Definition: trfExe.py:1783
python.trfExe.athenaExecutor.onlyMP
def onlyMP(self)
Definition: trfExe.py:954
python.trfExe.transformExecutor.preExeWallTime
def preExeWallTime(self)
Definition: trfExe.py:358
python.trfExe.bsMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2138
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
python.trfExe.POOLMergeExecutor
Definition: trfExe.py:1823
PyJobTransforms.trfLogger
Logging configuration for ATLAS job transforms.
python.trfExe.transformExecutor.first
def first(self)
Definition: trfExe.py:324
python.trfExe.transformExecutor.rc
def rc(self)
Definition: trfExe.py:299
python.trfExe.transformExecutor.usrTime
def usrTime(self)
Definition: trfExe.py:372
python.trfExe.transformExecutor.outData
outData
Definition: trfExe.py:270
python.trfExe.bsMergeExecutor._inputBS
_inputBS
Definition: trfExe.py:2140
python.trfExe.transformExecutor._dbMonitor
_dbMonitor
Definition: trfExe.py:195
python.trfExceptions.TransformLogfileErrorException
Exception class for validation failures detected by parsing logfiles.
Definition: trfExceptions.py:58
python.trfExe.optionalAthenaExecutor.validate
def validate(self)
Definition: trfExe.py:1809
python.trfExe.scriptExecutor._memMonitor
_memMonitor
Definition: trfExe.py:636
python.trfExe.athenaExecutor._athenaMPWorkerTopDir
_athenaMPWorkerTopDir
Definition: trfExe.py:1104
python.trfExe.athenaExecutor.inputDataTypeCountCheck
def inputDataTypeCountCheck(self)
Definition: trfExe.py:926
python.trfExe.bsMergeExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:2210
pickleTool.object
object
Definition: pickleTool.py:30
python.trfExe.bsMergeExecutor._useStubFile
_useStubFile
Definition: trfExe.py:2143
python.trfExe.scriptExecutor._cmd
_cmd
Definition: trfExe.py:634
python.trfExe.athenaExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:969
str
Definition: BTagTrackIpAccessor.cxx:11
python.trfExe.athenaExecutor._inputEventTest
_inputEventTest
Definition: trfExe.py:884
python.trfExe.transformExecutor.name
def name(self)
Definition: trfExe.py:210
python.trfExe.NTUPMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2112
python.trfExe.transformExecutor.validation
def validation(self)
Definition: trfExe.py:307
python.trfExe.transformExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:485
python.trfExe.transformExecutor.substep
def substep(self)
Definition: trfExe.py:218
python.trfExe.athenaExecutor._onlyMT
_onlyMT
Definition: trfExe.py:895
python.trfExe.scriptExecutor._exeArgs
_exeArgs
Definition: trfExe.py:624
python.trfExe.scriptExecutor._logFileName
_logFileName
Definition: trfExe.py:670
python.trfExe.athenaExecutor._dbsetup
_dbsetup
Definition: trfExe.py:1547
python.trfExe.scriptExecutor.exe
def exe(self)
Definition: trfExe.py:640
python.trfExe.transformExecutor.hasValidated
def hasValidated(self)
Definition: trfExe.py:315
python.trfExe.transformExecutor.doAll
def doAll(self, input=set(), output=set())
Convenience function.
Definition: trfExe.py:498
python.trfExe.scriptExecutor._output
_output
Definition: trfExe.py:662
python.trfExe.transformExecutor._memStats
_memStats
Definition: trfExe.py:188
python.trfExe.transformExecutor._name
_name
Definition: trfExe.py:140
python.trfExe.bsMergeExecutor._maskedFiles
_maskedFiles
Definition: trfExe.py:2142
python.trfUtils.forceToAlphaNum
def forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
Definition: trfUtils.py:443
python.trfExe.athenaExecutor._disableMT
_disableMT
Definition: trfExe.py:892
python.trfExe.transformExecutor._memLeakResult
_memLeakResult
Definition: trfExe.py:189
python.trfExe.athenaExecutor._perfMonFile
_perfMonFile
Definition: trfExe.py:900
WriteBchToCool.update
update
Definition: WriteBchToCool.py:67
python.trfExe.athenaExecutor._dataArgs
_dataArgs
Definition: trfExe.py:889
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfValidation.scriptLogFileReport
Definition: trfValidation.py:677
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfExe.bsMergeExecutor._outputBS
_outputBS
Definition: trfExe.py:2141
python.trfExe.transformExecutor.validationCpuTime
def validationCpuTime(self)
Definition: trfExe.py:415
python.trfExe.transformExecutor._resimevents
_resimevents
Definition: trfExe.py:196
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223
python.trfExe.athenaExecutor.substep
def substep(self)
Definition: trfExe.py:934
python.trfExe.transformExecutor._trf
_trf
Definition: trfExe.py:231
python.trfExe.transformExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:470
python.trfExe.athenaExecutor._athenaMPFileReport
_athenaMPFileReport
Definition: trfExe.py:1105
python.trfExe.transformExecutor.exeStartTimes
def exeStartTimes(self)
Definition: trfExe.py:335
python.trfExe.transformExecutor.reSimEvent
def reSimEvent(self)
Definition: trfExe.py:447
python.trfExe.bsMergeExecutor._mergeBSFileList
_mergeBSFileList
Definition: trfExe.py:2163
python.trfValidation.eventMatch
Small class used for vailiadating event counts between input and output files.
Definition: trfValidation.py:911
python.trfExe.bsMergeExecutor.execute
def execute(self)
Definition: trfExe.py:2194