ATLAS Offline Software
trfExe.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 
3 
8 
9 import json
10 import math
11 import os
12 import os.path as path
13 import re
14 import signal
15 import subprocess
16 import sys
17 import time
18 
19 import logging
20 from fnmatch import fnmatch
21 msg = logging.getLogger(__name__)
22 
23 from PyJobTransforms.trfJobOptions import JobOptionsTemplate
24 from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
25  cvmfsDBReleaseCheck, forceToAlphaNum, \
26  ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27 from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
28 from PyJobTransforms.trfExitCodes import trfExit
29 from PyJobTransforms.trfLogger import stdLogLevels
30 from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
31 from PyJobTransforms.trfMTTools import detectAthenaMTThreads
32 
33 import PyJobTransforms.trfExceptions as trfExceptions
34 import PyJobTransforms.trfValidation as trfValidation
35 import PyJobTransforms.trfArgClasses as trfArgClasses
36 import PyJobTransforms.trfEnv as trfEnv
37 import PyJobTransforms.trfMPITools as mpi
38 
39 
40 # Depending on the setting of LANG, sys.stdout may end up with ascii or ansi
41 # encoding, rather than utf-8. But Athena uses unicode for some log messages
42 # (another example of Gell-Mann's totalitarian principle) and that will result
43 # in a crash with python 3. In such a case, force the use of a utf-8 encoded
44 # output stream instead.
46  enc = s.encoding.lower()
47  if enc.find('ascii') >= 0 or enc.find('ansi') >= 0:
48  return open (s.fileno(), 'w', encoding='utf-8')
49  return s
50 
51 
52 
58 
59 
63  def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
64  self._argdict = argdict
65  self._dataDictionary = dataDictionary
66  self._firstExecutor = firstExecutor
67  self._executorStep = -1
69 
70  @property
71  def argdict(self):
72  return self._argdict
73 
74  @argdict.setter
75  def argdict(self, value):
76  self._argdict = value
77 
78  @property
79  def dataDictionary(self):
80  return self._dataDictionary
81 
82  @dataDictionary.setter
83  def dataDictionary(self, value):
84  self._dataDictionary = value
85 
86  @property
87  def firstExecutor(self):
88  return self._firstExecutor
89 
90  @firstExecutor.setter
91  def firstExecutor(self, value):
92  self._firstExecutor = value
93 
94  @property
95  def executorStep(self):
96  return self._executorStep
97 
98  @executorStep.setter
99  def executorStep(self, value):
100  self._executorStep = value
101 
102  @property
104  return self._totalExecutorSteps
105 
106  @totalExecutorSteps.setter
107  def totalExecutorSteps(self, value):
108  self._totalExecutorSteps = value
109 
110 
113  def setFromTransform(self, trf):
114  self._argdict = trf.argdict
115  self._dataDictionary = trf.dataDictionary
116 
117 
118  def addToArgdict(self, key, value):
119  self._argdict[key] = value
120 
121 
122  def addToDataDictionary(self, key, value):
123  self._dataDictionary[key] = value
124 
125 
126 
128 
129 
138  def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
139  # Some information to produce helpful log messages
140 
141  self._name = forceToAlphaNum(name)
142  # Data this executor can start from and produce
143  # Note we transform NULL to inNULL and outNULL as a convenience
144  self._inData = set(inData)
145  self._outData = set(outData)
146  if 'NULL' in self._inData:
147  self._inData.remove('NULL')
148  self._inData.add('inNULL')
149  if 'NULL' in self._outData:
150  self._outData.remove('NULL')
151  self._outData.add('outNULL')
152 
153  # It's forbidden for an executor to consume and produce the same datatype
154  dataOverlap = self._inData & self._outData
155  if len(dataOverlap) > 0:
156  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
157  'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self._name, ' '.join(dataOverlap)))
158 
159 
162  if conf is not None:
163  self.conf = conf
164  else:
165  self.conf = executorConfig()
166  if trf is not None:
167  self.conf.setFromTransform(trf)
168 
169  # Execution status
170  self._hasExecuted = False
171  self._rc = -1
172  self._errMsg = None
173 
174  # Validation status
175  self._hasValidated = False
176  self._isValidated = False
177 
178  # Extra metadata
179  # This dictionary holds extra metadata for this executor which will be
180  # provided in job reports
181  self._extraMetadata = {}
182 
183 
186  self._preExeStart = None
187  self._exeStart = self._exeStop = None
188  self._valStart = self._valStop = None
189  self._memStats = {}
190  self._memLeakResult = {}
191  self._memFullFile = None
192  self._eventCount = None
193  self._athenaMP = 0
194  self._athenaMT = 0
196  self._dbMonitor = None
197  self._resimevents = None
199  self._containerSetup = None
200 
201  # Holder for execution information about any merges done by this executor in MP mode
202  self._myMerger = []
203 
204 
205 
206  @property
207  def myMerger(self):
208  return self._myMerger
209 
210  @property
211  def name(self):
212  return self._name
213 
214  @name.setter
215  def name(self, value):
216  self._name = value
217 
218  @property
219  def substep(self):
220  if '_substep' in dir(self):
221  return self._substep
222  return None
223 
224  @property
225  def trf(self):
226  if '_trf' in dir(self):
227  return self._trf
228  return None
229 
230  @trf.setter
231  def trf(self, value):
232  self._trf = value
233 
234  @property
235  def inData(self):
236 
237  if '_inData' in dir(self):
238  return self._inData
239  return None
240 
241  @inData.setter
242  def inData(self, value):
243  self._inData = set(value)
244 
245  def inDataUpdate(self, value):
246 
247  if '_inData' in dir(self):
248  self._inData.update(value)
249  else:
250 
251  self.inData = value
252 
253 
254  @property
255  def outData(self):
256 
257  if '_outData' in dir(self):
258  return self._outData
259  return None
260 
261  @outData.setter
262  def outData(self, value):
263  self._outData = set(value)
264 
265  def outDataUpdate(self, value):
266 
267  if '_outData' in dir(self):
268  self._outData.update(value)
269  else:
270 
271  self.outData = value
272 
273  @property
274 
276  def input(self):
277 
278  if '_input' in dir(self):
279  return self._input
280  return None
281 
282  @property
283 
285  def output(self):
286 
287  if '_output' in dir(self):
288  return self._output
289  return None
290 
291  @property
292  def extraMetadata(self):
293  return self._extraMetadata
294 
295  @property
296  def hasExecuted(self):
297  return self._hasExecuted
298 
299  @property
300  def rc(self):
301  return self._rc
302 
303  @property
304  def errMsg(self):
305  return self._errMsg
306 
307  @property
308  def validation(self):
309  return self._validation
310 
311  @validation.setter
312  def validation(self, value):
313  self._validation = value
314 
315  @property
316  def hasValidated(self):
317  return self._hasValidated
318 
319  @property
320  def isValidated(self):
321  return self._isValidated
322 
323 
324  @property
325  def first(self):
326  if hasattr(self, '_first'):
327  return self._first
328  else:
329  return None
330 
331  @property
332  def preExeStartTimes(self):
333  return self._preExeStart
334 
335  @property
336  def exeStartTimes(self):
337  return self._exeStart
338 
339  @property
340  def exeStopTimes(self):
341  return self._exeStop
342 
343  @property
344  def valStartTimes(self):
345  return self._valStart
346 
347  @property
348  def valStopTimes(self):
349  return self._valStop
350 
351  @property
352  def preExeCpuTime(self):
353  if self._preExeStart and self._exeStart:
354  return calcCpuTime(self._preExeStart, self._exeStart)
355  else:
356  return None
357 
358  @property
359  def preExeWallTime(self):
360  if self._preExeStart and self._exeStart:
361  return calcWallTime(self._preExeStart, self._exeStart)
362  else:
363  return None
364 
365  @property
366  def cpuTime(self):
367  if self._exeStart and self._exeStop:
368  return calcCpuTime(self._exeStart, self._exeStop)
369  else:
370  return None
371 
372  @property
373  def usrTime(self):
374  if self._exeStart and self._exeStop:
375  return self._exeStop[2] - self._exeStart[2]
376  else:
377  return None
378 
379  @property
380  def sysTime(self):
381  if self._exeStart and self._exeStop:
382  return self._exeStop[3] - self._exeStart[3]
383  else:
384  return None
385 
386  @property
387  def wallTime(self):
388  if self._exeStart and self._exeStop:
389  return calcWallTime(self._exeStart, self._exeStop)
390  else:
391  return None
392 
393  @property
394  def memStats(self):
395  return self._memStats
396 
397  @property
398  def memAnalysis(self):
399  return self._memLeakResult
400 
401  @property
402  def postExeCpuTime(self):
403  if self._exeStop and self._valStart:
404  return calcCpuTime(self._exeStop, self._valStart)
405  else:
406  return None
407 
408  @property
409  def postExeWallTime(self):
410  if self._exeStop and self._valStart:
411  return calcWallTime(self._exeStop, self._valStart)
412  else:
413  return None
414 
415  @property
416  def validationCpuTime(self):
417  if self._valStart and self._valStop:
418  return calcCpuTime(self._valStart, self._valStop)
419  else:
420  return None
421 
422  @property
424  if self._valStart and self._valStop:
425  return calcWallTime(self._valStart, self._valStop)
426  else:
427  return None
428 
429  @property
430  def cpuTimeTotal(self):
431  if self._preExeStart and self._valStop:
432  return calcCpuTime(self._preExeStart, self._valStop)
433  else:
434  return None
435 
436  @property
437  def wallTimeTotal(self):
438  if self._preExeStart and self._valStop:
439  return calcWallTime(self._preExeStart, self._valStop)
440  else:
441  return None
442 
443  @property
444  def eventCount(self):
445  return self._eventCount
446 
447  @property
448  def reSimEvent(self):
449  return self._resimevents
450 
451  @property
452  def athenaMP(self):
453  return self._athenaMP
454 
455  @property
456  def dbMonitor(self):
457  return self._dbMonitor
458 
459 
460  # set start times, if not set already
461  def setPreExeStart(self):
462  if self._preExeStart is None:
463  self._preExeStart = os.times()
464  msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465 
466  def setValStart(self):
467  if self._valStart is None:
468  self._valStart = os.times()
469  msg.debug('valStart time is {0}'.format(self._valStart))
470 
471  def preExecute(self, input = set(), output = set()):
472  self.setPreExeStart()
473  msg.info('Preexecute for %s', self._name)
474 
475  def execute(self):
476  self._exeStart = os.times()
477  msg.debug('exeStart time is {0}'.format(self._exeStart))
478  msg.info('Starting execution of %s', self._name)
479  self._hasExecuted = True
480  self._rc = 0
481  self._errMsg = ''
482  msg.info('%s executor returns %d', self._name, self._rc)
483  self._exeStop = os.times()
484  msg.debug('preExeStop time is {0}'.format(self._exeStop))
485 
486  def postExecute(self):
487  msg.info('Postexecute for %s', self._name)
488 
489  def validate(self):
490  self.setValStart()
491  self._hasValidated = True
492  msg.info('Executor %s has no validation function - assuming all ok', self._name)
493  self._isValidated = True
494  self._errMsg = ''
495  self._valStop = os.times()
496  msg.debug('valStop time is {0}'.format(self._valStop))
497 
498 
499  def doAll(self, input=set(), output=set()):
500  self.preExecute(input, output)
501  self.execute()
502  self.postExecute()
503  self.validate()
504 
505 
507  def __init__(self, name = 'Logscan'):
508  super(logscanExecutor, self).__init__(name=name)
509  self._errorMaskFiles = None
510  self._logFileName = None
511 
512  def preExecute(self, input = set(), output = set()):
513  self.setPreExeStart()
514  msg.info('Preexecute for %s', self._name)
515  if 'logfile' in self.conf.argdict:
516  self._logFileName = self.conf.argdict['logfile'].value
517 
518  def validate(self):
519  self.setValStart()
520  msg.info("Starting validation for {0}".format(self._name))
521  if self._logFileName:
522 
524  if 'ignorePatterns' in self.conf.argdict:
525  igPat = self.conf.argdict['ignorePatterns'].value
526  else:
527  igPat = []
528  if 'ignoreFiles' in self.conf.argdict:
529  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
530  elif self._errorMaskFiles is not None:
531  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
532  else:
533  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
534 
535  # Now actually scan my logfile
536  msg.info('Scanning logfile {0} for errors'.format(self._logFileName))
537  self._logScan = trfValidation.athenaLogFileReport(logfile = self._logFileName, ignoreList = ignorePatterns)
538  worstError = self._logScan.worstError()
539 
540  # In general we add the error message to the exit message, but if it's too long then don't do
541  # that and just say look in the jobReport
542  if worstError['firstError']:
543  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
544  if 'CoreDumpSvc' in worstError['firstError']['message']:
545  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
546  elif 'G4Exception' in worstError['firstError']['message']:
547  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
548  else:
549  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
550  else:
551  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
552  else:
553  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
554 
555  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
556  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
557  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
559  self._isValidated = False
560  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
561  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
562  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
563 
564  # Must be ok if we got here!
565  msg.info('Executor {0} has validated successfully'.format(self.name))
566  self._isValidated = True
567  self._errMsg = ''
568 
569  self._valStop = os.times()
570  msg.debug('valStop time is {0}'.format(self._valStop))
571 
572 
574  def __init__(self, name = 'Echo', trf = None):
575 
576  # We are only changing the default name here
577  super(echoExecutor, self).__init__(name=name, trf=trf)
578 
579 
580  def execute(self):
581  self._exeStart = os.times()
582  msg.debug('exeStart time is {0}'.format(self._exeStart))
583  msg.info('Starting execution of %s', self._name)
584  msg.info('Transform argument dictionary now follows:')
585  for k, v in self.conf.argdict.items():
586  print("%s = %s" % (k, v))
587  self._hasExecuted = True
588  self._rc = 0
589  self._errMsg = ''
590  msg.info('%s executor returns %d', self._name, self._rc)
591  self._exeStop = os.times()
592  msg.debug('exeStop time is {0}'.format(self._exeStop))
593 
594 
596  def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
597 
598  # We are only changing the default name here
599  super(dummyExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
600 
601 
602  def execute(self):
603  self._exeStart = os.times()
604  msg.debug('exeStart time is {0}'.format(self._exeStart))
605  msg.info('Starting execution of %s', self._name)
606  for type in self._outData:
607  for k, v in self.conf.argdict.items():
608  if type in k:
609  msg.info('Creating dummy output file: {0}'.format(self.conf.argdict[k].value[0]))
610  open(self.conf.argdict[k].value[0], 'a').close()
611  self._hasExecuted = True
612  self._rc = 0
613  self._errMsg = ''
614  msg.info('%s executor returns %d', self._name, self._rc)
615  self._exeStop = os.times()
616  msg.debug('exeStop time is {0}'.format(self._exeStop))
617 
618 
620  def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData = set(),
621  exe = None, exeArgs = None, memMonitor = True):
622  # Name of the script we want to execute
623  self._exe = exe
624 
625  # With arguments (currently this means paste in the corresponding _argdict entry)
626  self._exeArgs = exeArgs
627 
628  super(scriptExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
629 
630  self._extraMetadata.update({'script' : exe})
631 
632  # Decide if we echo script output to stdout
633  self._echoOutput = False
634 
635  # Can either be written by base class or child
636  self._cmd = None
637 
638  self._memMonitor = memMonitor
639 
640  @property
641  def exe(self):
642  return self._exe
643 
644  @exe.setter
645  def exe(self, value):
646  self._exe = value
647  self._extraMetadata['script'] = value
648 
649  @property
650  def exeArgs(self):
651  return self._exeArgs
652 
653  @exeArgs.setter
654  def exeArgs(self, value):
655  self._exeArgs = value
656 # self._extraMetadata['scriptArgs'] = value
657 
658  def preExecute(self, input = set(), output = set()):
659  self.setPreExeStart()
660  msg.debug('scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
661 
662  self._input = input
663  self._output = output
664 
665 
666  if self._cmd is None:
667  self._buildStandardCommand()
668  msg.info('Will execute script as %s', self._cmd)
669 
670  # Define this here to have it for environment detection messages
671  self._logFileName = "log.{0}".format(self._name)
672 
673 
675  if 'TRF_ECHO' in os.environ:
676  msg.info('TRF_ECHO envvar is set - enabling command echoing to stdout')
677  self._echoOutput = True
678  elif 'TRF_NOECHO' in os.environ:
679  msg.info('TRF_NOECHO envvar is set - disabling command echoing to stdout')
680  self._echoOutput = False
681  # PS1 is for sh, bash; prompt is for tcsh and zsh
682  elif isInteractiveEnv():
683  msg.info('Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
684  self._echoOutput = True
685  elif 'TZHOME' in os.environ:
686  msg.info('Tier-0 environment detected - enabling command echoing to stdout')
687  self._echoOutput = True
688  if self._echoOutput is False:
689  msg.info('Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self._name, self._logFileName))
690 
691  # Now setup special loggers for logging execution messages to stdout and file
692  self._echologger = logging.getLogger(self._name)
693  self._echologger.setLevel(logging.INFO)
694  self._echologger.propagate = False
695 
696  encargs = {'encoding' : 'utf-8'}
697  self._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
698  self._exeLogFile.setFormatter(logging.Formatter('%(asctime)s %(message)s', datefmt='%H:%M:%S'))
699  self._echologger.addHandler(self._exeLogFile)
700 
701  if self._echoOutput:
702  self._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
703  self._echostream.setFormatter(logging.Formatter('%(name)s %(asctime)s %(message)s', datefmt='%H:%M:%S'))
704  self._echologger.addHandler(self._echostream)
705 
707  if self._exe:
708  self._cmd = [self.exe, ]
709  else:
710  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711  'No executor set in {0}'.format(self.__class__.__name__))
712  for arg in self.exeArgs:
713  if arg in self.conf.argdict:
714  # If we have a list then add each element to our list, else just str() the argument value
715  # Note if there are arguments which need more complex transformations then
716  # consider introducing a special toExeArg() method.
717  if isinstance(self.conf.argdict[arg].value, list):
718  self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719  else:
720  self._cmd.append(str(self.conf.argdict[arg].value))
721 
722 
723  def execute(self):
724  self._hasExecuted = True
725  msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726 
727  self._exeStart = os.times()
728  msg.debug('exeStart time is {0}'.format(self._exeStart))
729  if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730  msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731  os.execvp(self._cmd[0], self._cmd)
732 
733  encargs = {'encoding' : 'utf8'}
734  try:
735  # if we are already inside a container, then
736  # must map /srv of the nested container to /srv of the parent container:
737  # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738  if self._alreadyInContainer and self._containerSetup is not None:
739  msg.info("chdir /srv to launch a nested container for the substep")
740  os.chdir("/srv")
741  p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742  # go back to the original working directory immediately after to store prmon in it
743  if self._alreadyInContainer and self._containerSetup is not None:
744  msg.info("chdir {} after launching the nested container".format(self._workdir))
745  os.chdir(self._workdir)
746 
747  if self._memMonitor:
748  try:
749  self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750  self._memFullFile = 'prmon.full.' + self._name
751  memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752  '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753  '--interval', '30']
754  mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755  # TODO - link mem.full.current to mem.full.SUBSTEP
756  except Exception as e:
757  msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758  self._memMonitor = False
759 
760  while p.poll() is None:
761  line = p.stdout.readline()
762  if line:
763  self._echologger.info(line.rstrip())
764  # Hoover up remaining buffered output lines
765  for line in p.stdout:
766  self._echologger.info(line.rstrip())
767 
768  self._rc = p.returncode
769  msg.info('%s executor returns %d', self._name, self._rc)
770  self._exeStop = os.times()
771  msg.debug('exeStop time is {0}'.format(self._exeStop))
772  except OSError as e:
773  errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774  msg.error(errMsg)
775  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776  finally:
777  if self._memMonitor:
778  try:
779  mem_proc.send_signal(signal.SIGUSR1)
780  countWait = 0
781  while (not mem_proc.poll()) and countWait < 10:
782  time.sleep(0.1)
783  countWait += 1
784  except OSError:
785  pass
786 
787 
788  def postExecute(self):
789  if hasattr(self._exeLogFile, 'close'):
790  self._exeLogFile.close()
791  if self._memMonitor:
792  try:
793  memFile = open(self._memSummaryFile)
794  self._memStats = json.load(memFile)
795  except Exception as e:
796  msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e))
797  self._memMonitor = False
798  self._memStats = {}
799 
800 
801  def validate(self):
802  if self._valStart is None:
803  self._valStart = os.times()
804  msg.debug('valStart time is {0}'.format(self._valStart))
805  self._hasValidated = True
806 
807 
808  if self._rc == 0:
809  msg.info('Executor {0} validated successfully (return code {1})'.format(self._name, self._rc))
810  self._isValidated = True
811  self._errMsg = ''
812  else:
813  # Want to learn as much as possible from the non-zero code
814  # this is a bit hard in general, although one can do signals.
815  # Probably need to be more specific per exe, i.e., athena non-zero codes
816  self._isValidated = False
817  if self._rc < 0:
818  # Map return codes to what the shell gives (128 + SIGNUM)
819  self._rc = 128 - self._rc
820  if trfExit.codeToSignalname(self._rc) != "":
821  self._errMsg = '{0} got a {1} signal (exit code {2})'.format(self._name, trfExit.codeToSignalname(self._rc), self._rc)
822  else:
823  self._errMsg = 'Non-zero return code from %s (%d)' % (self._name, self._rc)
824  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_FAIL'), self._errMsg)
825 
826 
828  if 'checkEventCount' in self.conf.argdict and self.conf.argdict['checkEventCount'].returnMyValue(exe=self) is False:
829  msg.info('Event counting for substep {0} is skipped'.format(self.name))
830  else:
831  if 'mpi' in self.conf.argdict and not mpi.mpiShouldValidate():
832  msg.info('MPI mode -- skipping output event count check')
833  else:
834  checkcount=trfValidation.eventMatch(self)
835  checkcount.decide()
836  self._eventCount = checkcount.eventCount
837  msg.info('Event counting for substep {0} passed'.format(self.name))
838 
839  self._valStop = os.times()
840  msg.debug('valStop time is {0}'.format(self._valStop))
841 
842 
843 
845  _exitMessageLimit = 200 # Maximum error message length to report in the exitMsg
846  _defaultIgnorePatternFile = ['atlas_error_mask.db']
847 
848 
885  def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
886  inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'],
887  substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {},
888  literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None,
889  manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
890 
891  self._substep = forceToAlphaNum(substep)
892  self._inputEventTest = inputEventTest
893  self._tryDropAndReload = tryDropAndReload
894  self._extraRunargs = extraRunargs
895  self._runtimeRunargs = runtimeRunargs
896  self._literalRunargs = literalRunargs
897  self._dataArgs = dataArgs
898  self._errorMaskFiles = errorMaskFiles
899  self._inputDataTypeCountCheck = inputDataTypeCountCheck
900  self._disableMT = disableMT
901  self._disableMP = disableMP
902  self._onlyMP = onlyMP
903  self._onlyMT = onlyMT
904  self._onlyMPWithRunargs = onlyMPWithRunargs
905  self._skeletonCA=skeletonCA
906 
907  if perfMonFile:
908  self._perfMonFile = None
909  msg.debug("Resource monitoring from PerfMon is now deprecated")
910 
911  # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
912  if isinstance(skeletonFile, str):
913  self._skeleton = [skeletonFile]
914  else:
915  self._skeleton = skeletonFile
916 
917  super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918  exeArgs=exeArgs, memMonitor=memMonitor)
919 
920  # Add athena specific metadata
921  self._extraMetadata.update({'substep': substep})
922 
923  # Setup JO templates
924  if self._skeleton or self._skeletonCA:
925  self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
926  else:
927  self._jobOptionsTemplate = None
928 
929  @property
931  return self._inputDataTypeCountCheck
932 
933  @inputDataTypeCountCheck.setter
934  def inputDataTypeCountCheck(self, value):
935  self._inputDataTypeCountCheck = value
936 
937  @property
938  def substep(self):
939  return self._substep
940 
941  @property
942  def disableMP(self):
943  return self._disableMP
944 
945  @disableMP.setter
946  def disableMP(self, value):
947  self._disableMP = value
948 
949  @property
950  def disableMT(self):
951  return self._disableMT
952 
953  @disableMT.setter
954  def disableMT(self, value):
955  self._disableMT = value
956 
957  @property
958  def onlyMP(self):
959  return self._onlyMP
960 
961  @onlyMP.setter
962  def onlyMP(self, value):
963  self._onlyMP = value
964 
965  @property
966  def onlyMT(self):
967  return self._onlyMT
968 
969  @onlyMT.setter
970  def onlyMT(self, value):
971  self._onlyMT = value
972 
973  def preExecute(self, input = set(), output = set()):
974  self.setPreExeStart()
975  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
976 
977  # Check we actually have events to process!
978  inputEvents = 0
979  dt = ""
980  if self._inputDataTypeCountCheck is None:
981  self._inputDataTypeCountCheck = input
982  for dataType in self._inputDataTypeCountCheck:
983  if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
984  continue
985 
986  thisInputEvents = self.conf.dataDictionary[dataType].nentries
987  if thisInputEvents > inputEvents:
988  inputEvents = thisInputEvents
989  dt = dataType
990 
991  # Now take into account skipEvents and maxEvents
992  if ('skipEvents' in self.conf.argdict and
993  self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
994  mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
995  else:
996  mySkipEvents = 0
997 
998  if ('maxEvents' in self.conf.argdict and
999  self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000  myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001  else:
1002  myMaxEvents = -1
1003 
1004  # Any events to process...?
1005  if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1006  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1007  'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1008 
1009  try:
1010  # Expected events to process
1011  if (myMaxEvents != -1):
1012  if (self.inData and next(iter(self.inData)) == 'inNULL'):
1013  expectedEvents = myMaxEvents
1014  else:
1015  expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1016  else:
1017  expectedEvents = inputEvents-mySkipEvents
1018  except TypeError:
1019  # catching type error from UNDEFINED inputEvents count
1020  msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1021  expectedEvents = 0
1022 
1023 
1026  OSSetupString = None
1027 
1028  # Extract the asetup string
1029  asetupString = None
1030  legacyThreadingRelease = False
1031  if 'asetup' in self.conf.argdict:
1032  asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1033  legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1034  else:
1035  msg.info('Asetup report: {0}'.format(asetupReport()))
1036 
1037  if asetupString is not None:
1038  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1039  currentOS = os.environ['ALRB_USER_PLATFORM']
1040  if legacyOSRelease and "centos7" not in currentOS:
1041  OSSetupString = "centos7"
1042  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1043 
1044 
1045  # allow overriding the container OS using a flag
1046  if 'runInContainer' in self.conf.argdict:
1047  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1048  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1049  if OSSetupString is not None and asetupString is None:
1050  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1051  '--asetup must be used for the substep which requires --runInContainer')
1052 
1053  # Conditional MP based on runtime arguments
1054  if self._onlyMPWithRunargs:
1055  for k in self._onlyMPWithRunargs:
1056  if k in self.conf._argdict:
1057  self._onlyMP = True
1058 
1059  # Check the consistency of parallel configuration: CLI flags + evnironment.
1060  if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1061  ('ATHENA_CORE_NUMBER' not in os.environ)):
1062  # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1063  msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1064  else:
1065  # Try to detect AthenaMT mode, number of threads and number of concurrent events
1066  if not self._disableMT:
1067  self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1068 
1069  # Try to detect AthenaMP mode and number of workers
1070  if not self._disableMP:
1071  self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1072 
1073  # Check that we actually support MT
1074  if self._onlyMP and self._athenaMT > 0:
1075  msg.info("This configuration does not support MT, falling back to MP")
1076  if self._athenaMP == 0:
1077  self._athenaMP = self._athenaMT
1078  self._athenaMT = 0
1079  self._athenaConcurrentEvents = 0
1080 
1081  # Check that we actually support MP
1082  if self._onlyMT and self._athenaMP > 0:
1083  msg.info("This configuration does not support MP, using MT")
1084  if self._athenaMT == 0:
1085  self._athenaMT = self._athenaMP
1087  self._athenaMP = 0
1088 
1089  # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1090  # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1091  if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1092  msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1093  self._disableMP = True
1094  self._athenaMP = 0
1095 
1096  # Handle executor steps
1097  if self.conf.totalExecutorSteps > 1:
1098  for dataType in output:
1099  if self.conf._dataDictionary[dataType].originalName:
1100  self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1101  else:
1102  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1103  self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1104  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1105 
1106  # And if this is (still) athenaMP, then set some options for workers and output file report
1107  if self._athenaMP > 0:
1108  self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1109  self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1110  self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1111  if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1113  else:
1114  self._athenaMPReadEventOrders = False
1115  # Decide on scheduling
1116  if ('athenaMPStrategy' in self.conf.argdict and
1117  (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1118  self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1119  else:
1120  self._athenaMPStrategy = 'SharedQueue'
1121 
1122  # See if we have options for the target output file size
1123  if 'athenaMPMergeTargetSize' in self.conf.argdict:
1124  for dataType in output:
1125  if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1126  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1127  msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1128  else:
1129  # Use a globbing strategy
1130  matchedViaGlob = False
1131  for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1132  if fnmatch(dataType, mtsType):
1133  self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1134  msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135  matchedViaGlob = True
1136  break
1137  if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1138  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1139  msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1140 
1141  # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1142  # so that the mother process output file (if it exists) can be used directly
1143  # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1144  for dataType in output:
1145  if self.conf.totalExecutorSteps <= 1:
1146  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1147  if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1148  if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1149  msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1150  else:
1151  self.conf._dataDictionary[dataType].value[0] += "_000"
1152  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1153  else:
1154  self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1155 
1156 
1157  if 'mpi' in self.conf.argdict:
1158  msg.info("Running in MPI mode")
1159  mpi.setupMPIConfig(output, self.conf.dataDictionary)
1160 
1161 
1162  if self._skeleton or self._skeletonCA:
1163  inputFiles = dict()
1164  for dataType in input:
1165  inputFiles[dataType] = self.conf.dataDictionary[dataType]
1166  outputFiles = dict()
1167  for dataType in output:
1168  outputFiles[dataType] = self.conf.dataDictionary[dataType]
1169 
1170  # See if we have any 'extra' file arguments
1171  nameForFiles = commonExecutorStepName(self._name)
1172  for dataType, dataArg in self.conf.dataDictionary.items():
1173  if isinstance(dataArg, list) and dataArg:
1174  if self.conf.totalExecutorSteps <= 1:
1175  raise ValueError('Multiple input arguments provided but only running one substep')
1176  if self.conf.totalExecutorSteps != len(dataArg):
1177  raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1178 
1179  if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1180  inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1181  else:
1182  if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1183  inputFiles[dataArg.subtype] = dataArg
1184 
1185  msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1186 
1187  # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1188  self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1189  output = outputFiles)
1190 
1191 
1193  if len(input) > 0:
1194  self._extraMetadata['inputs'] = list(input)
1195  if len(output) > 0:
1196  self._extraMetadata['outputs'] = list(output)
1197 
1198 
1199  dbrelease = dbsetup = None
1200  if 'DBRelease' in self.conf.argdict:
1201  dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1202  if path.islink(dbrelease):
1203  dbrelease = path.realpath(dbrelease)
1204  if dbrelease:
1205  # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1206  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1207  if dbdMatch:
1208  msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1209  if not os.access(dbrelease, os.R_OK):
1210  msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1211  msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1212  dbrelease = dbdMatch.group(1)
1213  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1214  else:
1215  # Check if the DBRelease is setup
1216  msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1217  unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1218  if unpacked:
1219  # Now run the setup.py script to customise the paths to the current location...
1220  setupDBRelease(dbsetup)
1221  # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1222  else:
1223  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1224 
1225 
1227  self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1228  self._prepAthenaCommandLine()
1229 
1230 
1231  super(athenaExecutor, self).preExecute(input, output)
1232 
1233  # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1234  # This will have asetup and/or DB release setups in it
1235  # Do this last in this preExecute as the _cmd needs to be finalised
1236  msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1237  self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1238  msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1239 
1240 
1241  def postExecute(self):
1242  super(athenaExecutor, self).postExecute()
1243  # MPI merging
1244  if 'mpi' in self.conf.argdict:
1245  mpi.mergeOutputs()
1246 
1247  # Handle executor substeps
1248  if self.conf.totalExecutorSteps > 1:
1249  if self._athenaMP > 0:
1250  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1251  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1252  if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1253  # first loop over datasets for the output
1254  for dataType in self._output:
1255  newValue = []
1256  if self._athenaMP > 0:
1257  # assume the same number of workers all the time
1258  for i in range(self.conf.totalExecutorSteps):
1259  for v in self.conf.dataDictionary[dataType].value:
1260  newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1261  '_{0}{1}_'.format(executorStepSuffix, i)))
1262  else:
1263  self.conf.dataDictionary[dataType].multipleOK = True
1264  # just combine all executors
1265  for i in range(self.conf.totalExecutorSteps):
1266  newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1267  self.conf.dataDictionary[dataType].value = newValue
1268 
1269  # do the merging if needed
1270  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1271  self._smartMerge(self.conf.dataDictionary[dataType])
1272 
1273  # If this was an athenaMP run then we need to update output files
1274  elif self._athenaMP > 0:
1275  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1276 
1277  skipFileChecks=False
1278  if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1279  skipFileChecks=True
1280  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1281  for dataType in self._output:
1282  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1283  self._smartMerge(self.conf.dataDictionary[dataType])
1284 
1285  if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1286  self._targzipJiveXML()
1287 
1288  # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1289  # This is a bit ugly to have such a specific feature here though
1290  # TODO
1291  # The best is to have a general approach so that user can extract useful info from log
1292  # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1293  # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1294  # and use it to extract required info and do the summation during log scan.
1295  if self._logFileName=='log.ReSim' and self.name=='ReSim':
1296  msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1298 
1299  # Remove intermediate input/output files of sub-steps
1300  # Delete only files with io="temporay" which are files with pattern "tmp*"
1301  # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1302  # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1303  # Enable if --deleteIntermediateOutputfiles is set
1304  if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1305  inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1306 
1307  for k, v in inputDataDictionary.items():
1308  if not v.io == 'temporary':
1309  continue
1310  for filename in v.value:
1311  if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1312  msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1313  # Check if symbolic link and delete also linked file
1314  if (os.path.realpath(filename) != filename):
1315  targetpath = os.path.realpath(filename)
1316  os.unlink(filename)
1317  if (targetpath) and os.access(targetpath, os.R_OK):
1318  os.unlink(targetpath)
1319 
1320 
1321  def validate(self):
1322  self.setValStart()
1323  self._hasValidated = True
1324  deferredException = None
1325  memLeakThreshold = 5000
1326  _hasMemLeak = False
1327 
1328 
1329  try:
1330  super(athenaExecutor, self).validate()
1332  # In this case we hold this exception until the logfile has been scanned
1333  msg.error('Validation of return code failed: {0!s}'.format(e))
1334  deferredException = e
1335 
1336 
1343  if self._memFullFile:
1344  msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1345  self._memLeakResult = analytic().getFittedData(self._memFullFile)
1346  if self._memLeakResult:
1347  if self._memLeakResult['slope'] > memLeakThreshold:
1348  _hasMemLeak = True
1349  msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1350  else:
1351  msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1352  else:
1353  msg.info('No memory monitor file to be analysed')
1354 
1355  # Logfile scan setup
1356  # Always use ignorePatterns from the command line
1357  # For patterns in files, pefer the command line first, then any special settings for
1358  # this executor, then fallback to the standard default (atlas_error_mask.db)
1359  if 'ignorePatterns' in self.conf.argdict:
1360  igPat = self.conf.argdict['ignorePatterns'].value
1361  else:
1362  igPat = []
1363  if 'ignoreFiles' in self.conf.argdict:
1364  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1365  elif self._errorMaskFiles is not None:
1366  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1367  else:
1368  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1369 
1370  # Now actually scan my logfile
1371  msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1372  self._logScan = trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
1373  ignoreList=ignorePatterns)
1374  worstError = self._logScan.worstError()
1375  eventLoopWarnings = self._logScan.eventLoopWarnings()
1377 
1378 
1379  # In general we add the error message to the exit message, but if it's too long then don't do
1380  # that and just say look in the jobReport
1381  if worstError['firstError']:
1382  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1383  if 'CoreDumpSvc' in worstError['firstError']['message']:
1384  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1385  elif 'G4Exception' in worstError['firstError']['message']:
1386  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1387  else:
1388  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1389  else:
1390  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1391  else:
1392  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1393 
1394  # If we failed on the rc, then abort now
1395  if deferredException is not None:
1396  # Add any logfile information we have
1397  if worstError['nLevel'] >= stdLogLevels['ERROR']:
1398  deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1399  # Add the result of memory analysis
1400  if _hasMemLeak:
1401  deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1402  raise deferredException
1403 
1404 
1405  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1406  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1407  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1408  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1409  self._isValidated = False
1410  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1411  # Add the result of memory analysis
1412  if _hasMemLeak:
1413  exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1414  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1415  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1416 
1417  # Print event loop warnings
1418  if (len(eventLoopWarnings) > 0):
1419  msg.warning('Found WARNINGS in the event loop, as follows:')
1420  for element in eventLoopWarnings:
1421  msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1422 
1423  # Must be ok if we got here!
1424  msg.info('Executor {0} has validated successfully'.format(self.name))
1425  self._isValidated = True
1426 
1427  self._valStop = os.times()
1428  msg.debug('valStop time is {0}'.format(self._valStop))
1429 
1430 
1431  def _isCAEnabled(self):
1432  # CA not present
1433  if 'CA' not in self.conf.argdict:
1434  # If there is no legacy skeleton, then we are running with CA
1435  if not self._skeleton:
1436  return True
1437  else:
1438  return False
1439 
1440  # CA present but None, all substeps running with CA
1441  if self.conf.argdict['CA'] is None:
1442  return True
1443 
1444  # CA enabled for a substep, running with CA
1445  if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1446  return True
1447 
1448  return False
1449 
1450 
1452 
1455  if 'athena' in self.conf.argdict:
1456  self._exe = self.conf.argdict['athena'].value
1457  self._cmd = [self._exe]
1458 
1459  # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1460  currentSubstep = None
1461  if 'athenaopts' in self.conf.argdict:
1462  currentName = commonExecutorStepName(self.name)
1463  if currentName in self.conf.argdict['athenaopts'].value:
1464  currentSubstep = currentName
1465  if self.substep in self.conf.argdict['athenaopts'].value:
1466  msg.info('Athenaopts found for {0} and {1}, joining options. '
1467  'Consider changing your configuration to use just the name or the alias of the substep.'
1468  .format(currentSubstep, self.substep))
1469  self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1470  del self.conf.argdict['athenaopts'].value[self.substep]
1471  msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1472  elif self.substep in self.conf.argdict['athenaopts'].value:
1473  currentSubstep = self.substep
1474  elif 'all' in self.conf.argdict['athenaopts'].value:
1475  currentSubstep = 'all'
1476 
1477  # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1478  preLoadUpdated = dict()
1479  if 'LD_PRELOAD' in self._envUpdate._envdict:
1480  preLoadUpdated[currentSubstep] = False
1481  if 'athenaopts' in self.conf.argdict:
1482  if currentSubstep is not None:
1483  for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1484  # This code is pretty ugly as the athenaopts argument contains
1485  # strings which are really key/value pairs
1486  if athArg.startswith('--preloadlib'):
1487  try:
1488  i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1489  v = athArg.split('=', 1)[1]
1490  msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1491  newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1492  self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1493  except Exception as e:
1494  msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1495  preLoadUpdated[currentSubstep] = True
1496  break
1497  if not preLoadUpdated[currentSubstep]:
1498  msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1499  if 'athenaopts' in self.conf.argdict:
1500  if currentSubstep is not None:
1501  self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1502  else:
1503  self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1504  else:
1505  self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1506 
1507  # Now update command line with the options we have (including any changes to preload)
1508  if 'athenaopts' in self.conf.argdict:
1509  if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1510  self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1511  elif currentSubstep in self.conf.argdict['athenaopts'].value:
1512  self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1513 
1514  if currentSubstep is None:
1515  currentSubstep = 'all'
1516 
1517  if self._tryDropAndReload:
1518  if self._isCAEnabled():
1519  msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1520  elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1521  msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1522  elif 'athenaopts' in self.conf.argdict:
1523  athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1524  # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1525  if currentSubstep in self.conf.argdict['athenaopts'].value:
1526  conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1527  if len(conflictOpts) > 0:
1528  msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1529  else:
1530  msg.info('Appending "--drop-and-reload" to athena options')
1531  self._cmd.append('--drop-and-reload')
1532  else:
1533  msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1534  self._cmd.append('--drop-and-reload')
1535  else:
1536  # This is the 'standard' case - so drop and reload should be ok
1537  msg.info('Appending "--drop-and-reload" to athena options')
1538  self._cmd.append('--drop-and-reload')
1539  else:
1540  msg.info('Skipping test for "--drop-and-reload" in this executor')
1541 
1542  if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1543  # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1544  if self._athenaMT > 0 and not self._disableMT:
1545  if not ('athenaopts' in self.conf.argdict and
1546  any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1547  self._cmd.append('--threads=%s' % str(self._athenaMT))
1548 
1549  # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1550  if self._athenaMP > 0 and not self._disableMP:
1551  if not ('athenaopts' in self.conf.argdict and
1552  any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1553  self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1554 
1555  # Add topoptions
1556  if self._skeleton or self._skeletonCA:
1557  self._cmd += self._topOptionsFiles
1558  msg.info('Updated script arguments with topoptions: %s', self._cmd)
1559 
1560 
1561 
1563  self,
1564  asetup = None,
1565  dbsetup = None,
1566  ossetup = None
1567  ):
1568  self._originalCmd = self._cmd
1569  self._asetup = asetup
1570  self._dbsetup = dbsetup
1571  self._containerSetup = ossetup
1572  self._workdir = os.getcwd()
1573  self._alreadyInContainer = self._workdir.startswith("/srv")
1574  self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1575  self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1576 
1577  # Create a setupATLAS script
1578  setupATLAS = 'my_setupATLAS.sh'
1579  with open(setupATLAS, 'w') as f:
1580  print("#!/bin/bash", file=f)
1581  print("""
1582 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1583  export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1584 fi
1585 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1586  , file=f)
1587  os.chmod(setupATLAS, 0o755)
1588 
1589  msg.debug(
1590  'Preparing wrapper file {wrapperFileName} with '
1591  'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1592  wrapperFileName = self._wrapperFile,
1593  asetupStatus = self._asetup,
1594  dbsetupStatus = self._dbsetup
1595  )
1596  )
1597 
1598  container_cmd = None
1599  try:
1600  with open(self._wrapperFile, 'w') as wrapper:
1601  print('#!/bin/sh', file=wrapper)
1602  if self._containerSetup is not None:
1603  container_cmd = [ os.path.abspath(setupATLAS),
1604  "-c",
1605  self._containerSetup,
1606  "--pwd",
1607  self._workdir,
1608  "-s",
1609  os.path.join('.', self._setupFile),
1610  "-r"]
1611  print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1612  print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1613  print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1614  file = wrapper)
1615  print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1616  file=wrapper)
1617  print('echo', file=wrapper)
1618 
1619  if asetup:
1620  wfile = wrapper
1621  asetupFile = None
1622  # if the substep is executed within a container, setup athena with a separate script
1623  # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1624  if self._containerSetup is not None:
1625  asetupFile = open(self._setupFile, 'w')
1626  wfile = asetupFile
1627  print(f'source ./{setupATLAS} -q', file=wfile)
1628  print(f'asetup {asetup}', file=wfile)
1629  print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1630  if dbsetup:
1631  dbroot = path.dirname(dbsetup)
1632  dbversion = path.basename(dbroot)
1633  print("# DBRelease setup", file=wrapper)
1634  print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1635  print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1636  print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1637  print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1638  print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1639  print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1640  if self._disableMT:
1641  print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1642  if self._disableMP:
1643  print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1644  if self._envUpdate.len > 0:
1645  for envSetting in self._envUpdate.values:
1646  if not envSetting.startswith('LD_PRELOAD'):
1647  print("export", envSetting, file=wrapper)
1648  # If Valgrind is engaged, a serialised Athena configuration file
1649  # is generated for use with a subsequent run of Athena with
1650  # Valgrind.
1651  if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1652  msg.info('Valgrind engaged')
1653  # Define the file name of the serialised Athena
1654  # configuration.
1655  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1656  name = self._name
1657  )
1658  # Run Athena for generation of its serialised configuration.
1659  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1660  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1661  # Generate a Valgrind command, suppressing or ussing default
1662  # options as requested and extra options as requested.
1663  if 'valgrindDefaultOpts' in self.conf._argdict:
1664  defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1665  else:
1666  defaultOptions = True
1667  if 'valgrindExtraOpts' in self.conf._argdict:
1668  extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1669  else:
1670  extraOptionsList = None
1671  msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1672  msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1673  command = ValgrindCommand(
1674  defaultOptions = defaultOptions,
1675  extraOptionsList = extraOptionsList,
1676  AthenaSerialisedConfigurationFile = \
1677  AthenaSerialisedConfigurationFile
1678  )
1679  msg.debug("Valgrind command: {command}".format(command = command))
1680  print(command, file=wrapper)
1681  # If VTune is engaged, a serialised Athena configuration file
1682  # is generated for use with a subsequent run of Athena with
1683  # VTune.
1684  elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1685  msg.info('VTune engaged')
1686  # Define the file name of the serialised Athena
1687  # configuration.
1688  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1689  name = self._name
1690  )
1691  # Run Athena for generation of its serialised configuration.
1692  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1693  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1694  # Generate a VTune command, suppressing or ussing default
1695  # options as requested and extra options as requested.
1696  if 'vtuneDefaultOpts' in self.conf._argdict:
1697  defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1698  else:
1699  defaultOptions = True
1700  if 'vtuneExtraOpts' in self.conf._argdict:
1701  extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1702  else:
1703  extraOptionsList = None
1704  msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1705  msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1706  command = VTuneCommand(
1707  defaultOptions = defaultOptions,
1708  extraOptionsList = extraOptionsList,
1709  AthenaSerialisedConfigurationFile = \
1710  AthenaSerialisedConfigurationFile
1711  )
1712  msg.debug("VTune command: {command}".format(command = command))
1713  print(command, file=wrapper)
1714  else:
1715  msg.info('Valgrind/VTune not engaged')
1716  # run Athena command
1717  print(' '.join(self._cmd), file=wrapper)
1718  os.chmod(self._wrapperFile, 0o755)
1719  except OSError as e:
1720  errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1721  fileName = self._wrapperFile,
1722  error = e
1723  )
1724  msg.error(errMsg)
1726  trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1727  errMsg
1728  )
1729  self._cmd = [ path.join('.', self._wrapperFile) ]
1730  if self._containerSetup is not None:
1731  asetupFile.close()
1732  self._cmd = container_cmd + self._cmd
1733 
1734 
1735 
1737  def _smartMerge(self, fileArg):
1738 
1739  if 'selfMerge' not in dir(fileArg):
1740  msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1741  return
1742 
1743  if fileArg.mergeTargetSize == 0:
1744  msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1745  return
1746 
1747 
1748  mergeCandidates = [list()]
1749  currentMergeSize = 0
1750  for fname in fileArg.value:
1751  size = fileArg.getSingleMetadata(fname, 'file_size')
1752  if not isinstance(size, int):
1753  msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1754  return
1755  # if there is no file in the job, then we must add it
1756  if len(mergeCandidates[-1]) == 0:
1757  msg.debug('Adding file {0} to current empty merge list'.format(fname))
1758  mergeCandidates[-1].append(fname)
1759  currentMergeSize += size
1760  continue
1761  # see if adding this file gets us closer to the target size (but always add if target size is negative)
1762  if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1763  msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1764  mergeCandidates[-1].append(fname)
1765  currentMergeSize += size
1766  continue
1767  # close this merge list and start a new one
1768  msg.debug('Starting a new merge list with file {0}'.format(fname))
1769  mergeCandidates.append([fname])
1770  currentMergeSize = size
1771 
1772  msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1773 
1774  if len(mergeCandidates) == 1:
1775  # Merging to a single file, so use the original filename that the transform
1776  # was started with
1777  mergeNames = [fileArg.originalName]
1778  else:
1779  # Multiple merge targets, so we need a set of unique names
1780  counter = 0
1781  mergeNames = []
1782  for mergeGroup in mergeCandidates:
1783  # Note that the individual worker files get numbered with 3 digit padding,
1784  # so these non-padded merges should be fine
1785  mergeName = fileArg.originalName + '_{0}'.format(counter)
1786  while path.exists(mergeName):
1787  counter += 1
1788  mergeName = fileArg.originalName + '_{0}'.format(counter)
1789  mergeNames.append(mergeName)
1790  counter += 1
1791  # Now actually do the merges
1792  for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1793  msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1794  if len(mergeGroup) <= 1:
1795  msg.info('Skip merging for single file')
1796  else:
1797 
1798  self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1799 
1800 
1801  def _targzipJiveXML(self):
1802  #tgzipping JiveXML files
1803  targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1804  if os.path.exists(targetTGZName):
1805  os.remove(targetTGZName)
1806 
1807  import tarfile
1808  fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1809 
1810  # force gz compression
1811  tar = tarfile.open(targetTGZName, "w:gz")
1812  for fName in os.listdir('.'):
1813  matches = fNameRE.findall(fName)
1814  if len(matches) > 0:
1815  if fNameRE.findall(fName)[0] == fName:
1816  msg.info('adding %s to %s', fName, targetTGZName)
1817  tar.add(fName)
1818 
1819  tar.close()
1820  msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1821 
1822 
1823 
1825 
1826  # Here we validate, but will suppress any errors
1827  def validate(self):
1828  self.setValStart()
1829  try:
1830  super(optionalAthenaExecutor, self).validate()
1832  # In this case we hold this exception until the logfile has been scanned
1833  msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1834  self._isValidated = False
1835  self._errMsg = e.errMsg
1836  self._rc = e.errCode
1837  self._valStop = os.times()
1838  msg.debug('valStop time is {0}'.format(self._valStop))
1839 
1840 
1842 
1853  def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1854  inData = set(), outData = set(), exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True,
1855  perfMonFile = None, tryDropAndReload = True, extraRunargs = {},
1856  manualDataDictionary = None, memMonitor = True):
1857 
1858  super(POOLMergeExecutor, self).__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1859  inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1860  inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1861  tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1862  manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1863 
1864  def preExecute(self, input = set(), output = set()):
1865  self.setPreExeStart()
1866  super(POOLMergeExecutor, self).preExecute(input=input, output=output)
1867 
1868 
1869  def execute(self):
1870  # First call the parent executor, which will manage the athena execution for us
1871  super(POOLMergeExecutor, self).execute()
1872 
1873 
1874 
1877 
1879  def preExecute(self, input=set(), output=set()):
1880  self.setPreExeStart()
1881  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1882  if 'NTUP_PILEUP' not in output:
1883  # New derivation framework transform uses "formats"
1884  if 'formats' not in self.conf.argdict:
1885  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1886  'No derivation configuration specified')
1887 
1888  if ('DAOD' not in output) and ('D2AOD' not in output):
1889  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1890  'No base name for DAOD output')
1891 
1892  formatList = []
1893  if 'formats' in self.conf.argdict: formatList = self.conf.argdict['formats'].value
1894  for reduction in formatList:
1895  if ('DAOD' in output):
1896  dataType = 'DAOD_' + reduction
1897  if 'augmentations' not in self.conf.argdict:
1898  outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1899  else:
1900  for val in self.conf.argdict['augmentations'].value:
1901  if reduction in val.split(':')[0]:
1902  outputName = 'DAOD_' + val.split(':')[1] + '.' + self.conf.argdict['outputDAODFile'].value[0]
1903  break
1904  else:
1905  outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1906 
1907  if ('D2AOD' in output):
1908  dataType = 'D2AOD_' + reduction
1909  outputName = 'D2AOD_' + reduction + '.' + self.conf.argdict['outputD2AODFile'].value[0]
1910 
1911  msg.info('Adding reduction output type {0}'.format(dataType))
1912  output.add(dataType)
1913  newReduction = trfArgClasses.argPOOLFile(outputName, io='output', runarg=True, type='AOD',
1914  name=reduction)
1915  # References to _trf - can this be removed?
1916  self.conf.dataDictionary[dataType] = newReduction
1917 
1918  # Clean up the stub file from the executor input and the transform's data dictionary
1919  # (we don't remove the actual argFile instance)
1920  if ('DAOD' in output):
1921  output.remove('DAOD')
1922  del self.conf.dataDictionary['DAOD']
1923  del self.conf.argdict['outputDAODFile']
1924  if ('D2AOD' in output):
1925  output.remove('D2AOD')
1926  del self.conf.dataDictionary['D2AOD']
1927  del self.conf.argdict['outputD2AODFile']
1928 
1929  msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1930  msg.info('Input/Output: {0}/{1}'.format(input, output))
1931 
1932  msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1933  msg.info('Input/Output: {0}/{1}'.format(input, output))
1934  super(reductionFrameworkExecutor, self).preExecute(input, output)
1935 
1936 
1937 
1939  def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']),
1940  exe='DQHistogramMerge.py', exeArgs = [], memMonitor = True):
1942  self._histMergeList = 'HISTMergeList.txt'
1943 
1944  super(DQMergeExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1945  exeArgs=exeArgs, memMonitor=memMonitor)
1946 
1947 
1948  def preExecute(self, input = set(), output = set()):
1949  self.setPreExeStart()
1950  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1951 
1952  super(DQMergeExecutor, self).preExecute(input=input, output=output)
1953 
1954  # Write the list of files to be merged
1955  with open(self._histMergeList, 'w') as DQMergeFile:
1956  for dataType in input:
1957  for fname in self.conf.dataDictionary[dataType].value:
1958  self.conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1959  print(fname, file=DQMergeFile)
1960 
1961  self._cmd.append(self._histMergeList)
1962 
1963  # Add the output file
1964  if len(output) != 1:
1965  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1966  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
1967  outDataType = list(output)[0]
1968  self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
1969 
1970  # Set the run_post_processing to True/False
1971  if (self.conf._argdict.get("run_post_processing",False)):
1972  self._cmd.append('True')
1973  else:
1974  self._cmd.append('False')
1975 
1976  if (self.conf._argdict.get("is_incremental_merge",False)):
1977  self._cmd.append('True')
1978  else:
1979  self._cmd.append('False')
1980 
1981  for k in ("excludeHist","excludeDir"):
1982  if k in self.conf._argdict:
1983  self._cmd.append("--{0}={1}".format(k,self.conf._argdict[k]))
1984 
1985 
1986  def validate(self):
1987  self.setValStart()
1988  super(DQMergeExecutor, self).validate()
1989 
1990  exitErrorMessage = ''
1991  # Base class validation successful, Now scan the logfile for missed errors.
1992  try:
1994  worstError = logScan.worstError()
1995 
1996  # In general we add the error message to the exit message, but if it's too long then don't do
1997  # that and just say look in the jobReport
1998  if worstError['firstError']:
1999  if len(worstError['firstError']['message']) > logScan._msgLimit:
2000  exitErrorMessage = "Long {0} message at line {1}" \
2001  " (see jobReport for further details)".format(worstError['level'],
2002  worstError['firstError']['firstLine'])
2003  else:
2004  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2005  worstError['firstError']['message'])
2006  except OSError as e:
2007  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2009  'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2010 
2011  if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2012  'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2013  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2014 
2015  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2016  self._isValidated = False
2017  msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2018  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2019  raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2020 
2021  # Must be ok if we got here!
2022  msg.info('Executor {0} has validated successfully'.format(self.name))
2023  self._isValidated = True
2024 
2025  self._valStop = os.times()
2026  msg.debug('valStop time is {0}'.format(self._valStop))
2027 
2028 
2030  def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']),
2031  exe='DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor = True):
2033  self._histMergeList = 'HISTMergeList.txt'
2034 
2035  super(DQMPostProcessExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2036  exeArgs=exeArgs, memMonitor=memMonitor)
2037 
2038 
2039  def preExecute(self, input = set(), output = set()):
2040  self.setPreExeStart()
2041  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2042 
2043  super(DQMPostProcessExecutor, self).preExecute(input=input, output=output)
2044 
2045  #build input file list (typically only one):
2046  dsName=self.conf.argdict["inputHISTFile"].dataset
2047  inputList=[]
2048  for dataType in input:
2049  for fname in self.conf.dataDictionary[dataType].value:
2050  #if no dataset name is give, guess it from file name
2051  if not dsName: dsName=".".join(fname.split('.')[0:4])
2052  inputList.append("#".join([dsName,fname]))
2053 
2054 
2055  if len(output) != 1:
2056  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2057  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2058  outDataType = list(output)[0]
2059  #build argument json:
2060  #T0 uses keys: 'allowCOOLUpload', 'doWebDisplay', 'incrementalMode', 'inputHistFiles', 'mergeParams', 'outputHistFile', 'postProcessing', 'reverseFileOrder', 'servers'
2061  #more keys: runNumber, streamName,projectTag,filepaths,productionMode,skipMerge
2062  wrapperParams={"inputHistFiles" : inputList,
2063  "outputHistFile" : dsName+"#"+self.conf.dataDictionary[outDataType].value[0],
2064  "incrementalMode": "True" if self.conf._argdict.get("is_incremental_merge",False) else "False",
2065  "postProcessing" : "True" if self.conf._argdict.get("run_post_processing",False) else "False",
2066  "doWebDisplay" : "True" if self.conf._argdict.get("doWebDisplay",False) else "False",
2067  "allowCOOLUpload": "True" if self.conf._argdict.get("allowCOOLUpload",False) else "False",
2068  "mergeParams" : "",
2069 
2070  }
2071  if "servers" in self.conf._argdict:
2072  wrapperParams["server"]=self.conf._argdict["servers"]
2073 
2074  for k in ("excludeHist","excludeDir"):
2075  if k in self.conf._argdict:
2076  wrapperParams["mergeParams"]+=(" --{0}={1}".format(k,self.conf._argdict[k]))
2077 
2078 
2079  with open("args.json", "w") as f:
2080  json.dump(wrapperParams, f)
2081 
2082  self._cmd.append("--argJSON=args.json")
2083 
2084 
2085 
2086  def validate(self):
2087  self.setValStart()
2088  super(DQMPostProcessExecutor, self).validate()
2089 
2090  exitErrorMessage = ''
2091  # Base class validation successful, Now scan the logfile for missed errors.
2092  try:
2094  worstError = logScan.worstError()
2095 
2096  # In general we add the error message to the exit message, but if it's too long then don't do
2097  # that and just say look in the jobReport
2098  if worstError['firstError']:
2099  if len(worstError['firstError']['message']) > logScan._msgLimit:
2100  exitErrorMessage = "Long {0} message at line {1}" \
2101  " (see jobReport for further details)".format(worstError['level'],
2102  worstError['firstError']['firstLine'])
2103  else:
2104  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2105  worstError['firstError']['message'])
2106  except OSError as e:
2107  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2109  'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2110 
2111  if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2112  'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2113  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2114 
2115  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2116  self._isValidated = False
2117  msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2118  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2119  raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2120 
2121  # Must be ok if we got here!
2122  msg.info('Executor {0} has validated successfully'.format(self.name))
2123  self._isValidated = True
2124 
2125  self._valStop = os.times()
2126  msg.debug('valStop time is {0}'.format(self._valStop))
2127 
2129 
2130  def preExecute(self, input = set(), output = set()):
2131  self.setPreExeStart()
2132  msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2133 
2134  # Basic command, and allow overwrite of the output file
2135  if self._exe is None:
2136  self._exe = 'hadd'
2137  self._cmd = [self._exe, "-f"]
2138 
2139 
2140  # Add the output file
2141  if len(output) != 1:
2142  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2143  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2144  outDataType = list(output)[0]
2145  self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
2146  # Add to be merged to the cmd chain
2147  for dataType in input:
2148  self._cmd.extend(self.conf.dataDictionary[dataType].value)
2149 
2150  super(NTUPMergeExecutor, self).preExecute(input=input, output=output)
2151 
2152 
2153 
2155 
2156  def preExecute(self, input = set(), output = set()):
2157  self.setPreExeStart()
2158  self._inputBS = list(input)[0]
2159  self._outputBS = list(output)[0]
2160  self._maskedFiles = []
2161  self._useStubFile = False
2162  if 'maskEmptyInputs' in self.conf.argdict and self.conf.argdict['maskEmptyInputs'].value is True:
2163  eventfullFiles = []
2164  for fname in self.conf.dataDictionary[self._inputBS].value:
2165  nEvents = self.conf.dataDictionary[self._inputBS].getSingleMetadata(fname, 'nentries')
2166  msg.debug('Found {0} events in file {1}'.format(nEvents, fname))
2167  if isinstance(nEvents, int) and nEvents > 0:
2168  eventfullFiles.append(fname)
2169  self._maskedFiles = list(set(self.conf.dataDictionary[self._inputBS].value) - set(eventfullFiles))
2170  if len(self._maskedFiles) > 0:
2171  msg.info('The following input files are masked because they have 0 events: {0}'.format(' '.join(self._maskedFiles)))
2172  if len(eventfullFiles) == 0:
2173  if 'emptyStubFile' in self.conf.argdict and path.exists(self.conf.argdict['emptyStubFile'].value):
2174  self._useStubFile = True
2175  msg.info("All input files are empty - will use stub file {0} as output".format(self.conf.argdict['emptyStubFile'].value))
2176  else:
2177  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2178  'All input files had zero events - aborting BS merge')
2179 
2180  # Write the list of input files to a text file, so that testMergedFiles can swallow it
2181  self._mergeBSFileList = '{0}.list'.format(self._name)
2182  self._mergeBSLogfile = '{0}.out'.format(self._name)
2183  try:
2184  with open(self._mergeBSFileList, 'w') as BSFileList:
2185  for fname in self.conf.dataDictionary[self._inputBS].value:
2186  if fname not in self._maskedFiles:
2187  print(fname, file=BSFileList)
2188  except OSError as e:
2189  errMsg = 'Got an error when writing list of BS files to {0}: {1}'.format(self._mergeBSFileList, e)
2190  msg.error(errMsg)
2191  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'), errMsg)
2192 
2193  # Hope that we were given a correct filename...
2194  self._outputFilename = self.conf.dataDictionary[self._outputBS].value[0]
2195  if self._outputFilename.endswith('._0001.data'):
2196  self._doRename = False
2197  self._outputFilename = self._outputFilename.split('._0001.data')[0]
2198  elif self.conf.argdict['allowRename'].value is True:
2199  # OK, non-fatal, we go for a renaming
2200  msg.info('Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2201  self._doRename = True
2202  else:
2203  # No rename allowed, so we are dead...
2204  errmsg = 'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2205  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'), errmsg)
2206 
2207  # Set the correct command for execution
2208  self._cmd = [self._exe, self._mergeBSFileList, '0', self._outputFilename]
2209 
2210  super(bsMergeExecutor, self).preExecute(input=input, output=output)
2211 
2212  def execute(self):
2213  if self._useStubFile:
2214  # Need to fake execution!
2215  self._exeStart = os.times()
2216  msg.debug('exeStart time is {0}'.format(self._exeStart))
2217  msg.info("Using stub file for empty BS output - execution is fake")
2218  if self._outputFilename != self.conf.argdict['emptyStubFile'].value:
2219  os.rename(self.conf.argdict['emptyStubFile'].value, self._outputFilename)
2220  self._memMonitor = False
2221  self._hasExecuted = True
2222  self._rc = 0
2223  self._exeStop = os.times()
2224  msg.debug('exeStop time is {0}'.format(self._exeStop))
2225  else:
2226  super(bsMergeExecutor, self).execute()
2227 
2228  def postExecute(self):
2229  if self._useStubFile:
2230  pass
2231  elif self._doRename:
2232  self._expectedOutput = self._outputFilename + '._0001.data'
2233  msg.info('Renaming {0} to {1}'.format(self._expectedOutput, self.conf.dataDictionary[self._outputBS].value[0]))
2234  try:
2235  os.rename(self._outputFilename + '._0001.data', self.conf.dataDictionary[self._outputBS].value[0])
2236  except OSError as e:
2237  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
2238  'Exception raised when renaming {0} to {1}: {2}'.format(self._outputFilename, self.conf.dataDictionary[self._outputBS].value[0], e))
2239  super(bsMergeExecutor, self).postExecute()
2240 
2241 
2242 
2244 
2245  def preExecute(self, input = set(), output = set()):
2246  self.setPreExeStart()
2247  self._memMonitor = False
2248 
2249  #archiving
2250  if self._exe == 'zip':
2251  if 'outputArchFile' not in self.conf.argdict:
2252  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
2253 
2254  self._cmd = ['python']
2255  try:
2256  with open('zip_wrapper.py', 'w') as zip_wrapper:
2257  print("import zipfile, os, shutil", file=zip_wrapper)
2258  if os.path.exists(self.conf.argdict['outputArchFile'].value[0]):
2259  #appending input file(s) to existing archive
2260  print("zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2261  else:
2262  #creating new archive
2263  print("zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2264  print("for f in {}:".format(self.conf.argdict['inputDataFile'].value), file=zip_wrapper)
2265  #This module gives false positives (as of python 3.7.0). Will also check the name for ".zip"
2266  #print >> zip_wrapper, " if zipfile.is_zipfile(f):"
2267  print(" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2268  print(" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2269  print(" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2270  print(" archive.extractall('tmp')", file=zip_wrapper)
2271  print(" archive.close()", file=zip_wrapper)
2272  # remove stuff as soon as it is saved to output in order to save disk space at worker node
2273  print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2274  print(" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2275  print(" os.unlink(f)", file=zip_wrapper)
2276  print(" if os.path.isdir('tmp'):", file=zip_wrapper)
2277  print(" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2278  print(" for name in files:", file=zip_wrapper)
2279  print(" print 'Zipping {}'.format(name)", file=zip_wrapper)
2280  print(" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2281  print(" shutil.rmtree('tmp')", file=zip_wrapper)
2282  print(" else:", file=zip_wrapper)
2283  print(" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2284  print(" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2285  print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2286  print(" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2287  print(" os.unlink(f)", file=zip_wrapper)
2288  print("zf.close()", file=zip_wrapper)
2289  os.chmod('zip_wrapper.py', 0o755)
2290  except OSError as e:
2291  errMsg = 'error writing zip wrapper {fileName}: {error}'.format(fileName = 'zip_wrapper.py',
2292  error = e
2293  )
2294  msg.error(errMsg)
2295  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2296  errMsg
2297  )
2298  self._cmd.append('zip_wrapper.py')
2299 
2300  #unarchiving
2301  elif self._exe == 'unarchive':
2302  import zipfile
2303  for infile in self.conf.argdict['inputArchFile'].value:
2304  if not zipfile.is_zipfile(infile):
2305  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2306  'An input file is not a zip archive - aborting unpacking')
2307  self._cmd = ['python']
2308  try:
2309  with open('unarchive_wrapper.py', 'w') as unarchive_wrapper:
2310  print("import zipfile", file=unarchive_wrapper)
2311  print("for f in {}:".format(self.conf.argdict['inputArchFile'].value), file=unarchive_wrapper)
2312  print(" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2313  print(" path = '{}'".format(self.conf.argdict['path']), file=unarchive_wrapper)
2314  print(" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2315  print(" archive.extractall(path)", file=unarchive_wrapper)
2316  print(" archive.close()", file=unarchive_wrapper)
2317  os.chmod('unarchive_wrapper.py', 0o755)
2318  except OSError as e:
2319  errMsg = 'error writing unarchive wrapper {fileName}: {error}'.format(fileName = 'unarchive_wrapper.py',
2320  error = e
2321  )
2322  msg.error(errMsg)
2323  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2324  errMsg
2325  )
2326  self._cmd.append('unarchive_wrapper.py')
2327  super(archiveExecutor, self).preExecute(input=input, output=output)
2328 
python.trfExe.logscanExecutor._logFileName
_logFileName
Definition: trfExe.py:510
python.trfExe.transformExecutor._isValidated
_isValidated
Definition: trfExe.py:176
python.trfExe.echoExecutor.execute
def execute(self)
Definition: trfExe.py:580
python.trfExe.athenaExecutor._errorMaskFiles
_errorMaskFiles
Definition: trfExe.py:894
python.trfExe.transformExecutor._hasValidated
_hasValidated
Definition: trfExe.py:175
python.trfExe.athenaExecutor
Definition: trfExe.py:844
python.trfExe.athenaExecutor._runtimeRunargs
_runtimeRunargs
Definition: trfExe.py:891
python.trfExe.executorConfig.addToArgdict
def addToArgdict(self, key, value)
Add a new object to the argdict.
Definition: trfExe.py:118
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
python.trfExe.DQMPostProcessExecutor
Specialist execution class for DQM post-processing of histograms.
Definition: trfExe.py:2029
python.trfExe.athenaExecutor.disableMT
def disableMT(self)
Definition: trfExe.py:950
python.trfExe.executorConfig._argdict
_argdict
Definition: trfExe.py:64
python.trfExe.transformExecutor._athenaConcurrentEvents
_athenaConcurrentEvents
Definition: trfExe.py:195
python.trfExe.scriptExecutor._exeLogFile
_exeLogFile
Definition: trfExe.py:697
python.trfExe.transformExecutor.errMsg
def errMsg(self)
Definition: trfExe.py:304
python.trfExe.athenaExecutor._setupFile
_setupFile
Definition: trfExe.py:1570
python.trfExe.athenaExecutor._prepAthenaCommandLine
def _prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
Definition: trfExe.py:1451
python.trfExe.athenaExecutor._extraRunargs
_extraRunargs
Definition: trfExe.py:890
python.trfExe.transformExecutor.execute
def execute(self)
Definition: trfExe.py:475
python.trfExe.DQMPostProcessExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2039
python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
python.trfExe.DQMergeExecutor.validate
def validate(self)
Definition: trfExe.py:1986
python.trfExe.echoExecutor.__init__
def __init__(self, name='Echo', trf=None)
Definition: trfExe.py:574
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
python.trfExe.transformExecutor.myMerger
def myMerger(self)
Now define properties for these data members.
Definition: trfExe.py:207
python.trfExe.transformExecutor
Executors always only even execute a single step, as seen by the transform.
Definition: trfExe.py:127
python.trfExe.athenaExecutor._inputDataTypeCountCheck
_inputDataTypeCountCheck
Definition: trfExe.py:895
python.trfExe.DQMergeExecutor.__init__
def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
Definition: trfExe.py:1939
python.trfExe.transformExecutor._memFullFile
_memFullFile
Definition: trfExe.py:191
python.trfExe.NTUPMergeExecutor
Specialist execution class for merging NTUPLE files.
Definition: trfExe.py:2128
python.trfExe.logscanExecutor.validate
def validate(self)
Definition: trfExe.py:518
python.trfExe.executorConfig.setFromTransform
def setFromTransform(self, trf)
Set configuration properties from the parent transform.
Definition: trfExe.py:113
python.trfExe.POOLMergeExecutor.__init__
def __init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
Definition: trfExe.py:1853
python.trfArgClasses.argPOOLFile
POOL file class.
Definition: trfArgClasses.py:1440
python.trfExe.transformExecutor.preExeStartTimes
def preExeStartTimes(self)
Definition: trfExe.py:332
python.trfExceptions.TransformSetupException
Setup exceptions.
Definition: trfExceptions.py:42
python.trfExe.POOLMergeExecutor.execute
def execute(self)
Definition: trfExe.py:1869
python.trfExe.athenaExecutor._skeleton
_skeleton
Definition: trfExe.py:909
python.trfExe.athenaExecutor._skeletonCA
_skeletonCA
Definition: trfExe.py:901
python.trfExe.transformExecutor.cpuTimeTotal
def cpuTimeTotal(self)
Definition: trfExe.py:430
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfExe.transformExecutor.validate
def validate(self)
Definition: trfExe.py:489
python.trfExe.executorConfig.dataDictionary
def dataDictionary(self)
Definition: trfExe.py:79
python.trfExe.transformExecutor._validation
_validation
Definition: trfExe.py:313
python.trfExe.transformExecutor.inData
inData
Definition: trfExe.py:251
python.trfExe.athenaExecutor._athenaMPReadEventOrders
_athenaMPReadEventOrders
Definition: trfExe.py:1112
python.trfExe.DQMergeExecutor
Specialist execution class for merging DQ histograms.
Definition: trfExe.py:1938
python.trfExe.dummyExecutor.execute
def execute(self)
Definition: trfExe.py:602
python.trfExe.logscanExecutor.__init__
def __init__(self, name='Logscan')
Definition: trfExe.py:507
python.trfExe.DQMergeExecutor._histMergeList
_histMergeList
Definition: trfExe.py:1941
python.trfExe.logscanExecutor
Special executor that will enable a logfile scan as part of its validation.
Definition: trfExe.py:506
python.trfExe.transformExecutor._myMerger
_myMerger
Definition: trfExe.py:202
python.trfExe.transformExecutor._rc
_rc
Definition: trfExe.py:171
PyJobTransforms.trfJobOptions
Contains functions related Athena Job Options files.
python.trfExe.transformExecutor._valStop
_valStop
Definition: trfExe.py:188
index
Definition: index.py:1
python.trfExe.DQMPostProcessExecutor.validate
def validate(self)
Definition: trfExe.py:2086
python.trfExe.scriptExecutor.validate
def validate(self)
Definition: trfExe.py:801
python.trfExe.transformExecutor.athenaMP
def athenaMP(self)
Definition: trfExe.py:452
python.trfExe.bsMergeExecutor
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
Definition: trfExe.py:2154
python.trfExe.athenaExecutor._onlyMPWithRunargs
_onlyMPWithRunargs
Definition: trfExe.py:900
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1684
python.trfExe.athenaExecutor._asetup
_asetup
Definition: trfExe.py:1564
python.trfUtils.VTuneCommand
def VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition: trfUtils.py:1643
python.trfExe.athenaExecutor._topOptionsFiles
_topOptionsFiles
Handle MPI setup.
Definition: trfExe.py:1188
python.trfExe.transformExecutor._hasExecuted
_hasExecuted
Definition: trfExe.py:170
python.trfExe.transformExecutor._eventCount
_eventCount
Definition: trfExe.py:192
python.trfExe.executorConfig.firstExecutor
def firstExecutor(self)
Definition: trfExe.py:87
python.trfExe.scriptExecutor._echoOutput
_echoOutput
Definition: trfExe.py:632
python.trfExe.transformExecutor.output
def output(self)
Definition: trfExe.py:285
python.trfUtils.ValgrindCommand
def ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition: trfUtils.py:1577
python.trfExe.logscanExecutor._logScan
_logScan
TODO: This is a cut'n'paste from the athenaExecutor We really should factorise this and use it common...
Definition: trfExe.py:537
python.trfExe._encoding_stream
def _encoding_stream(s)
Definition: trfExe.py:45
python.trfExe.transformExecutor.validationWallTime
def validationWallTime(self)
Definition: trfExe.py:423
python.trfExe.POOLMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:1864
python.trfExe.scriptExecutor._echologger
_echologger
Definition: trfExe.py:692
python.trfExe.scriptExecutor._echostream
_echostream
Definition: trfExe.py:702
python.trfMTTools.detectAthenaMTThreads
def detectAthenaMTThreads(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMT has been requested.
Definition: trfMTTools.py:23
python.trfExe.reductionFrameworkExecutor
Specialist executor to manage the handling of multiple implicit input and output files within the der...
Definition: trfExe.py:1876
python.trfExe.bsMergeExecutor._mergeBSLogfile
_mergeBSLogfile
Definition: trfExe.py:2182
python.trfExe.transformExecutor.postExeWallTime
def postExeWallTime(self)
Definition: trfExe.py:409
athena.value
value
Definition: athena.py:124
python.trfExe.transformExecutor.memStats
def memStats(self)
Definition: trfExe.py:394
python.trfExe.reductionFrameworkExecutor.preExecute
def preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
Definition: trfExe.py:1879
python.trfExe.transformExecutor.preExeCpuTime
def preExeCpuTime(self)
Definition: trfExe.py:352
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.trfExe.transformExecutor.__init__
def __init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition: trfExe.py:138
python.trfExe.transformExecutor._outData
_outData
Definition: trfExe.py:145
PyJobTransforms.trfExitCodes
Module for transform exit codes.
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.trfExe.scriptExecutor._exe
_exe
Definition: trfExe.py:622
python.trfEnv.environmentUpdate
Class holding the update to an environment that will be passed on to an executor.
Definition: trfEnv.py:15
python.trfExe.transformExecutor.extraMetadata
def extraMetadata(self)
Definition: trfExe.py:292
intersection
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
Definition: compareFlatTrees.cxx:25
python.trfExe.transformExecutor.setValStart
def setValStart(self)
Definition: trfExe.py:466
python.trfUtils.reportEventsPassedSimFilter
def reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition: trfUtils.py:1717
python.trfExe.transformExecutor._extraMetadata
_extraMetadata
Definition: trfExe.py:181
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.trfExe.bsMergeExecutor._expectedOutput
_expectedOutput
Definition: trfExe.py:2232
python.trfExe.athenaExecutor.validate
def validate(self)
Definition: trfExe.py:1321
python.trfExe.athenaExecutor._originalCmd
_originalCmd
Definition: trfExe.py:1563
python.trfExe.executorConfig.addToDataDictionary
def addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
Definition: trfExe.py:122
python.trfExe.DQMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:1948
python.trfExe.executorConfig.totalExecutorSteps
def totalExecutorSteps(self)
Definition: trfExe.py:103
python.trfExe.archiveExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2245
python.trfExe.transformExecutor.valStartTimes
def valStartTimes(self)
Definition: trfExe.py:344
python.trfExe.transformExecutor.hasExecuted
def hasExecuted(self)
Definition: trfExe.py:296
python.trfExe.athenaExecutor._envUpdate
_envUpdate
Add input/output file information - this can't be done in init as we don't know what our inputs and o...
Definition: trfExe.py:1226
python.trfExe.bsMergeExecutor._doRename
_doRename
Definition: trfExe.py:2196
python.trfExe.transformExecutor._exeStop
_exeStop
Definition: trfExe.py:187
pool::DbPrintLvl::setLevel
void setLevel(MsgLevel l)
Definition: DbPrint.h:32
python.trfExe.dummyExecutor
Definition: trfExe.py:595
python.trfExe.athenaExecutor._wrapperFile
_wrapperFile
Definition: trfExe.py:1569
python.trfExceptions.TransformExecutionException
Base class for execution exceptions.
Definition: trfExceptions.py:62
python.trfExe.transformExecutor.wallTime
def wallTime(self)
Definition: trfExe.py:387
PixelModuleFeMask_create_db.remove
string remove
Definition: PixelModuleFeMask_create_db.py:83
python.trfExe.scriptExecutor.execute
def execute(self)
Definition: trfExe.py:723
python.trfExe.transformExecutor.wallTimeTotal
def wallTimeTotal(self)
Definition: trfExe.py:437
python.trfExe.transformExecutor._valStart
_valStart
Definition: trfExe.py:188
python.trfExe.scriptExecutor.exeArgs
def exeArgs(self)
Definition: trfExe.py:650
python.trfExe.executorConfig._firstExecutor
_firstExecutor
Definition: trfExe.py:66
python.trfExe.transformExecutor.inDataUpdate
def inDataUpdate(self, value)
Definition: trfExe.py:245
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
python.trfExe.executorConfig.__init__
def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Definition: trfExe.py:63
python.trfExe.executorConfig._executorStep
_executorStep
Definition: trfExe.py:67
python.trfExe.athenaExecutor.name
name
Definition: trfExe.py:1295
python.trfExe.DQMPostProcessExecutor.__init__
def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
Definition: trfExe.py:2030
python.trfExe.transformExecutor._containerSetup
_containerSetup
Definition: trfExe.py:199
python.trfExe.transformExecutor.eventCount
def eventCount(self)
Definition: trfExe.py:444
python.trfExe.athenaExecutor._workdir
_workdir
Definition: trfExe.py:1567
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
python.trfExe.scriptExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:658
PyJobTransforms.trfMPTools
Utilities for handling AthenaMP jobs.
python.trfExe.logscanExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:512
python.trfExe.athenaExecutor._smartMerge
def _smartMerge(self, fileArg)
Manage smart merging of output files.
Definition: trfExe.py:1737
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
python.trfValidation.ignorePatterns
Class of patterns that can be ignored from athena logfiles.
Definition: trfValidation.py:100
python.trfExe.logscanExecutor._errorMaskFiles
_errorMaskFiles
Definition: trfExe.py:509
python.trfExe.transformExecutor.outDataUpdate
def outDataUpdate(self, value)
Definition: trfExe.py:265
python.trfExe.transformExecutor.conf
conf
Executor configuration:
Definition: trfExe.py:163
python.trfExe.athenaExecutor._tryDropAndReload
_tryDropAndReload
Definition: trfExe.py:889
python.trfExe.athenaExecutor._writeAthenaWrapper
def _writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
Definition: trfExe.py:1562
python.trfExe.transformExecutor.input
def input(self)
Definition: trfExe.py:276
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
python.trfExe.scriptExecutor
Definition: trfExe.py:619
python.trfExe.athenaExecutor._literalRunargs
_literalRunargs
Definition: trfExe.py:892
python.trfExe.executorConfig._dataDictionary
_dataDictionary
Definition: trfExe.py:65
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.trfArgClasses.argSubstepList
Argument class for substep lists, suitable for preExec/postExec.
Definition: trfArgClasses.py:2056
python.trfExe.athenaExecutor._athenaMPStrategy
_athenaMPStrategy
Definition: trfExe.py:1118
python.trfValidation.athenaLogFileReport
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Definition: trfValidation.py:211
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.trfExe.transformExecutor.postExeCpuTime
def postExeCpuTime(self)
Definition: trfExe.py:402
python.trfExe.transformExecutor._athenaMP
_athenaMP
Definition: trfExe.py:193
python.trfExe.executorConfig.executorStep
def executorStep(self)
Definition: trfExe.py:95
python.trfExe.echoExecutor
Definition: trfExe.py:573
python.trfExe.transformExecutor._exeStart
_exeStart
Definition: trfExe.py:187
python.trfExe.athenaExecutor._logScan
_logScan
Definition: trfExe.py:1372
python.trfExe.transformExecutor.exeStopTimes
def exeStopTimes(self)
Definition: trfExe.py:340
python.trfExe.athenaExecutor._disableMP
_disableMP
Definition: trfExe.py:897
python.trfExe.scriptExecutor._memSummaryFile
_memSummaryFile
Definition: trfExe.py:749
python.trfExe.scriptExecutor.__init__
def __init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Definition: trfExe.py:620
python.trfExe.athenaExecutor.disableMP
def disableMP(self)
Definition: trfExe.py:942
python.trfExe.scriptExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:788
PyJobTransforms.trfValidation
Validation control for job transforms.
beamspotman.dir
string dir
Definition: beamspotman.py:621
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.trfExceptions.TransformValidationException
Group of validation exceptions.
Definition: trfExceptions.py:50
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
python.trfExe.transformExecutor.dbMonitor
def dbMonitor(self)
Definition: trfExe.py:456
python.trfExe.transformExecutor.memAnalysis
def memAnalysis(self)
Definition: trfExe.py:398
python.trfExe.scriptExecutor._buildStandardCommand
def _buildStandardCommand(self)
Definition: trfExe.py:706
python.trfExe.executorConfig.argdict
def argdict(self)
Definition: trfExe.py:71
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.trfExe.optionalAthenaExecutor
Athena executor where failure is not consisered fatal.
Definition: trfExe.py:1824
python.trfExe.athenaExecutor._isCAEnabled
def _isCAEnabled(self)
Check if running with CA.
Definition: trfExe.py:1431
python.trfExe.bsMergeExecutor._outputFilename
_outputFilename
Definition: trfExe.py:2194
python.trfExe.executorConfig._totalExecutorSteps
_totalExecutorSteps
Definition: trfExe.py:68
python.trfExe.transformExecutor._athenaMT
_athenaMT
Definition: trfExe.py:194
python.trfExe.athenaExecutor._substep
_substep
Definition: trfExe.py:887
python.trfExe.dummyExecutor.__init__
def __init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition: trfExe.py:596
python.trfExe.transformExecutor.valStopTimes
def valStopTimes(self)
Definition: trfExe.py:348
PyJobTransforms.trfMTTools
Utilities for handling AthenaMT jobs.
python.trfExe.transformExecutor._preExeStart
_preExeStart
Definition: trfExe.py:186
python.trfExe.transformExecutor.trf
def trf(self)
Definition: trfExe.py:225
python.trfExe.archiveExecutor
Archive transform.
Definition: trfExe.py:2243
python.trfExe.athenaExecutor._jobOptionsTemplate
_jobOptionsTemplate
Definition: trfExe.py:921
python.trfExe.athenaExecutor._onlyMP
_onlyMP
Do we need to run asetup first? For a certain substep (e.g.
Definition: trfExe.py:898
python.trfExe.athenaExecutor._athenaMPEventOrdersFile
_athenaMPEventOrdersFile
Definition: trfExe.py:1110
python.trfExe.transformExecutor.sysTime
def sysTime(self)
Definition: trfExe.py:380
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfExe.scriptExecutor._input
_input
Definition: trfExe.py:662
python.trfExe.transformExecutor.cpuTime
def cpuTime(self)
Definition: trfExe.py:366
python.trfMPTools.detectAthenaMPProcs
def detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition: trfMPTools.py:27
python.trfExe.athenaExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:1241
python.trfExe.athenaExecutor.onlyMT
def onlyMT(self)
Definition: trfExe.py:966
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1692
python.trfExe.transformExecutor._alreadyInContainer
_alreadyInContainer
Definition: trfExe.py:198
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
Trk::open
@ open
Definition: BinningType.h:40
python.trfExe.DQMPostProcessExecutor._histMergeList
_histMergeList
Definition: trfExe.py:2032
python.trfExe.transformExecutor.isValidated
def isValidated(self)
Definition: trfExe.py:320
python.trfExe.transformExecutor._errMsg
_errMsg
Definition: trfExe.py:172
python.trfExe.transformExecutor._inData
_inData
Definition: trfExe.py:144
python.trfExe.transformExecutor.setPreExeStart
def setPreExeStart(self)
Definition: trfExe.py:461
PyJobTransforms.trfMPITools
Utilities for handling MPI-Athena jobs.
ActsTrk::detail::MakeDerivedVariant::extend
constexpr std::variant< Args..., T > extend(const std::variant< Args... > &, const T &)
Definition: MakeDerivedVariant.h:17
python.trfExe.executorConfig
Definition: trfExe.py:57
python.trfExe.athenaExecutor.__init__
def __init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
Definition: trfExe.py:885
PyJobTransforms.trfUtils
Transform utility functions.
python.trfExe.athenaExecutor._targzipJiveXML
def _targzipJiveXML(self)
Definition: trfExe.py:1801
python.trfExe.athenaExecutor.onlyMP
def onlyMP(self)
Definition: trfExe.py:958
python.trfExe.transformExecutor.preExeWallTime
def preExeWallTime(self)
Definition: trfExe.py:359
python.trfExe.bsMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2156
python.trfExe.POOLMergeExecutor
Definition: trfExe.py:1841
PyJobTransforms.trfLogger
Logging configuration for ATLAS job transforms.
python.trfExe.transformExecutor.first
def first(self)
Definition: trfExe.py:325
python.trfExe.transformExecutor.rc
def rc(self)
Definition: trfExe.py:300
python.trfExe.transformExecutor.usrTime
def usrTime(self)
Definition: trfExe.py:373
python.trfExe.transformExecutor.outData
outData
Definition: trfExe.py:271
python.trfExe.bsMergeExecutor._inputBS
_inputBS
Definition: trfExe.py:2158
python.trfExe.transformExecutor._dbMonitor
_dbMonitor
Definition: trfExe.py:196
python.trfExceptions.TransformLogfileErrorException
Exception class for validation failures detected by parsing logfiles.
Definition: trfExceptions.py:58
python.trfExe.optionalAthenaExecutor.validate
def validate(self)
Definition: trfExe.py:1827
python.trfExe.scriptExecutor._memMonitor
_memMonitor
Definition: trfExe.py:637
python.trfExe.athenaExecutor._athenaMPWorkerTopDir
_athenaMPWorkerTopDir
Definition: trfExe.py:1108
python.trfExe.athenaExecutor.inputDataTypeCountCheck
def inputDataTypeCountCheck(self)
Definition: trfExe.py:930
python.trfExe.bsMergeExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:2228
pickleTool.object
object
Definition: pickleTool.py:29
python.trfExe.bsMergeExecutor._useStubFile
_useStubFile
Definition: trfExe.py:2161
python.trfExe.scriptExecutor._cmd
_cmd
Definition: trfExe.py:635
python.trfExe.athenaExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:973
str
Definition: BTagTrackIpAccessor.cxx:11
python.trfExe.athenaExecutor._inputEventTest
_inputEventTest
Definition: trfExe.py:888
python.trfExe.transformExecutor.name
def name(self)
Definition: trfExe.py:211
python.trfExe.NTUPMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2130
python.trfExe.transformExecutor.validation
def validation(self)
Definition: trfExe.py:308
python.trfExe.transformExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:486
python.trfExe.transformExecutor.substep
def substep(self)
Definition: trfExe.py:219
python.trfExe.athenaExecutor._onlyMT
_onlyMT
Definition: trfExe.py:899
python.trfExe.scriptExecutor._exeArgs
_exeArgs
Definition: trfExe.py:625
python.trfExe.scriptExecutor._logFileName
_logFileName
Definition: trfExe.py:671
python.trfExe.athenaExecutor._dbsetup
_dbsetup
Definition: trfExe.py:1565
python.trfExe.scriptExecutor.exe
def exe(self)
Definition: trfExe.py:641
python.trfExe.transformExecutor.hasValidated
def hasValidated(self)
Definition: trfExe.py:316
python.trfExe.transformExecutor.doAll
def doAll(self, input=set(), output=set())
Convenience function.
Definition: trfExe.py:499
python.trfExe.scriptExecutor._output
_output
Definition: trfExe.py:663
python.trfExe.transformExecutor._memStats
_memStats
Definition: trfExe.py:189
python.trfExe.transformExecutor._name
_name
Definition: trfExe.py:141
python.trfExe.bsMergeExecutor._maskedFiles
_maskedFiles
Definition: trfExe.py:2160
python.trfUtils.forceToAlphaNum
def forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
Definition: trfUtils.py:443
python.trfExe.athenaExecutor._disableMT
_disableMT
Definition: trfExe.py:896
python.ParticleTypeUtil.info
def info
Definition: ParticleTypeUtil.py:87
python.trfExe.transformExecutor._memLeakResult
_memLeakResult
Definition: trfExe.py:190
python.trfExe.athenaExecutor._perfMonFile
_perfMonFile
Definition: trfExe.py:904
python.trfExe.athenaExecutor._dataArgs
_dataArgs
Definition: trfExe.py:893
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfValidation.scriptLogFileReport
Definition: trfValidation.py:709
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfExe.bsMergeExecutor._outputBS
_outputBS
Definition: trfExe.py:2159
python.trfExe.transformExecutor.validationCpuTime
def validationCpuTime(self)
Definition: trfExe.py:416
python.trfExe.transformExecutor._resimevents
_resimevents
Definition: trfExe.py:197
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223
python.trfExe.athenaExecutor.substep
def substep(self)
Definition: trfExe.py:938
python.trfExe.transformExecutor._trf
_trf
Definition: trfExe.py:232
python.trfExe.transformExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:471
python.trfExe.athenaExecutor._athenaMPFileReport
_athenaMPFileReport
Definition: trfExe.py:1109
python.trfExe.transformExecutor.exeStartTimes
def exeStartTimes(self)
Definition: trfExe.py:336
python.trfExe.transformExecutor.reSimEvent
def reSimEvent(self)
Definition: trfExe.py:448
python.trfExe.bsMergeExecutor._mergeBSFileList
_mergeBSFileList
Definition: trfExe.py:2181
python.trfValidation.eventMatch
Small class used for vailiadating event counts between input and output files.
Definition: trfValidation.py:943
python.trfExe.bsMergeExecutor.execute
def execute(self)
Definition: trfExe.py:2212