ATLAS Offline Software
trfExe.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2 
3 
8 
9 import json
10 import math
11 import os
12 import os.path as path
13 import re
14 import signal
15 import subprocess
16 import sys
17 import time
18 
19 import logging
20 from fnmatch import fnmatch
21 msg = logging.getLogger(__name__)
22 
23 from PyJobTransforms.trfJobOptions import JobOptionsTemplate
24 from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
25  cvmfsDBReleaseCheck, forceToAlphaNum, \
26  ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27 from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
28 from PyJobTransforms.trfExitCodes import trfExit
29 from PyJobTransforms.trfLogger import stdLogLevels
30 from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
31 from PyJobTransforms.trfMTTools import detectAthenaMTThreads
32 
33 import PyJobTransforms.trfExceptions as trfExceptions
34 import PyJobTransforms.trfValidation as trfValidation
35 import PyJobTransforms.trfArgClasses as trfArgClasses
36 import PyJobTransforms.trfEnv as trfEnv
37 import PyJobTransforms.trfMPITools as mpi
38 
39 
40 # Depending on the setting of LANG, sys.stdout may end up with ascii or ansi
41 # encoding, rather than utf-8. But Athena uses unicode for some log messages
42 # (another example of Gell-Mann's totalitarian principle) and that will result
43 # in a crash with python 3. In such a case, force the use of a utf-8 encoded
44 # output stream instead.
46  enc = s.encoding.lower()
47  if enc.find('ascii') >= 0 or enc.find('ansi') >= 0:
48  return open (s.fileno(), 'w', encoding='utf-8')
49  return s
50 
51 
52 
58 
59 
63  def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
64  self._argdict = argdict
65  self._dataDictionary = dataDictionary
66  self._firstExecutor = firstExecutor
67  self._executorStep = -1
69 
70  @property
71  def argdict(self):
72  return self._argdict
73 
74  @argdict.setter
75  def argdict(self, value):
76  self._argdict = value
77 
78  @property
79  def dataDictionary(self):
80  return self._dataDictionary
81 
82  @dataDictionary.setter
83  def dataDictionary(self, value):
84  self._dataDictionary = value
85 
86  @property
87  def firstExecutor(self):
88  return self._firstExecutor
89 
90  @firstExecutor.setter
91  def firstExecutor(self, value):
92  self._firstExecutor = value
93 
94  @property
95  def executorStep(self):
96  return self._executorStep
97 
98  @executorStep.setter
99  def executorStep(self, value):
100  self._executorStep = value
101 
102  @property
104  return self._totalExecutorSteps
105 
106  @totalExecutorSteps.setter
107  def totalExecutorSteps(self, value):
108  self._totalExecutorSteps = value
109 
110 
113  def setFromTransform(self, trf):
114  self._argdict = trf.argdict
115  self._dataDictionary = trf.dataDictionary
116 
117 
118  def addToArgdict(self, key, value):
119  self._argdict[key] = value
120 
121 
122  def addToDataDictionary(self, key, value):
123  self._dataDictionary[key] = value
124 
125 
126 
128 
129 
138  def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
139  # Some information to produce helpful log messages
140 
141  self._name = forceToAlphaNum(name)
142  # Data this executor can start from and produce
143  # Note we transform NULL to inNULL and outNULL as a convenience
144  self._inData = set(inData)
145  self._outData = set(outData)
146  if 'NULL' in self._inData:
147  self._inData.remove('NULL')
148  self._inData.add('inNULL')
149  if 'NULL' in self._outData:
150  self._outData.remove('NULL')
151  self._outData.add('outNULL')
152 
153  # It's forbidden for an executor to consume and produce the same datatype
154  dataOverlap = self._inData & self._outData
155  if len(dataOverlap) > 0:
156  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
157  'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self._name, ' '.join(dataOverlap)))
158 
159 
162  if conf is not None:
163  self.conf = conf
164  else:
165  self.conf = executorConfig()
166  if trf is not None:
167  self.conf.setFromTransform(trf)
168 
169  # Execution status
170  self._hasExecuted = False
171  self._rc = -1
172  self._errMsg = None
173 
174  # Validation status
175  self._hasValidated = False
176  self._isValidated = False
177 
178  # Extra metadata
179  # This dictionary holds extra metadata for this executor which will be
180  # provided in job reports
181  self._extraMetadata = {}
182 
183 
186  self._preExeStart = None
187  self._exeStart = self._exeStop = None
188  self._valStart = self._valStop = None
189  self._memStats = {}
190  self._memLeakResult = {}
191  self._memFullFile = None
192  self._eventCount = None
193  self._athenaMP = 0
194  self._athenaMT = 0
196  self._dbMonitor = None
197  self._resimevents = None
199  self._containerSetup = None
200 
201  # Holder for execution information about any merges done by this executor in MP mode
202  self._myMerger = []
203 
204 
205 
206  @property
207  def myMerger(self):
208  return self._myMerger
209 
210  @property
211  def name(self):
212  return self._name
213 
214  @name.setter
215  def name(self, value):
216  self._name = value
217 
218  @property
219  def substep(self):
220  if '_substep' in dir(self):
221  return self._substep
222  return None
223 
224  @property
225  def trf(self):
226  if '_trf' in dir(self):
227  return self._trf
228  return None
229 
230  @trf.setter
231  def trf(self, value):
232  self._trf = value
233 
234  @property
235  def inData(self):
236 
237  if '_inData' in dir(self):
238  return self._inData
239  return None
240 
241  @inData.setter
242  def inData(self, value):
243  self._inData = set(value)
244 
245  def inDataUpdate(self, value):
246 
247  if '_inData' in dir(self):
248  self._inData.update(value)
249  else:
250 
251  self.inData = value
252 
253 
254  @property
255  def outData(self):
256 
257  if '_outData' in dir(self):
258  return self._outData
259  return None
260 
261  @outData.setter
262  def outData(self, value):
263  self._outData = set(value)
264 
265  def outDataUpdate(self, value):
266 
267  if '_outData' in dir(self):
268  self._outData.update(value)
269  else:
270 
271  self.outData = value
272 
273  @property
274 
276  def input(self):
277 
278  if '_input' in dir(self):
279  return self._input
280  return None
281 
282  @property
283 
285  def output(self):
286 
287  if '_output' in dir(self):
288  return self._output
289  return None
290 
291  @property
292  def extraMetadata(self):
293  return self._extraMetadata
294 
295  @property
296  def hasExecuted(self):
297  return self._hasExecuted
298 
299  @property
300  def rc(self):
301  return self._rc
302 
303  @property
304  def errMsg(self):
305  return self._errMsg
306 
307  @property
308  def validation(self):
309  return self._validation
310 
311  @validation.setter
312  def validation(self, value):
313  self._validation = value
314 
315  @property
316  def hasValidated(self):
317  return self._hasValidated
318 
319  @property
320  def isValidated(self):
321  return self._isValidated
322 
323 
324  @property
325  def first(self):
326  if hasattr(self, '_first'):
327  return self._first
328  else:
329  return None
330 
331  @property
332  def preExeStartTimes(self):
333  return self._preExeStart
334 
335  @property
336  def exeStartTimes(self):
337  return self._exeStart
338 
339  @property
340  def exeStopTimes(self):
341  return self._exeStop
342 
343  @property
344  def valStartTimes(self):
345  return self._valStart
346 
347  @property
348  def valStopTimes(self):
349  return self._valStop
350 
351  @property
352  def preExeCpuTime(self):
353  if self._preExeStart and self._exeStart:
354  return calcCpuTime(self._preExeStart, self._exeStart)
355  else:
356  return None
357 
358  @property
359  def preExeWallTime(self):
360  if self._preExeStart and self._exeStart:
361  return calcWallTime(self._preExeStart, self._exeStart)
362  else:
363  return None
364 
365  @property
366  def cpuTime(self):
367  if self._exeStart and self._exeStop:
368  return calcCpuTime(self._exeStart, self._exeStop)
369  else:
370  return None
371 
372  @property
373  def usrTime(self):
374  if self._exeStart and self._exeStop:
375  return self._exeStop[2] - self._exeStart[2]
376  else:
377  return None
378 
379  @property
380  def sysTime(self):
381  if self._exeStart and self._exeStop:
382  return self._exeStop[3] - self._exeStart[3]
383  else:
384  return None
385 
386  @property
387  def wallTime(self):
388  if self._exeStart and self._exeStop:
389  return calcWallTime(self._exeStart, self._exeStop)
390  else:
391  return None
392 
393  @property
394  def memStats(self):
395  return self._memStats
396 
397  @property
398  def memAnalysis(self):
399  return self._memLeakResult
400 
401  @property
402  def postExeCpuTime(self):
403  if self._exeStop and self._valStart:
404  return calcCpuTime(self._exeStop, self._valStart)
405  else:
406  return None
407 
408  @property
409  def postExeWallTime(self):
410  if self._exeStop and self._valStart:
411  return calcWallTime(self._exeStop, self._valStart)
412  else:
413  return None
414 
415  @property
416  def validationCpuTime(self):
417  if self._valStart and self._valStop:
418  return calcCpuTime(self._valStart, self._valStop)
419  else:
420  return None
421 
422  @property
424  if self._valStart and self._valStop:
425  return calcWallTime(self._valStart, self._valStop)
426  else:
427  return None
428 
429  @property
430  def cpuTimeTotal(self):
431  if self._preExeStart and self._valStop:
432  return calcCpuTime(self._preExeStart, self._valStop)
433  else:
434  return None
435 
436  @property
437  def wallTimeTotal(self):
438  if self._preExeStart and self._valStop:
439  return calcWallTime(self._preExeStart, self._valStop)
440  else:
441  return None
442 
443  @property
444  def eventCount(self):
445  return self._eventCount
446 
447  @property
448  def reSimEvent(self):
449  return self._resimevents
450 
451  @property
452  def athenaMP(self):
453  return self._athenaMP
454 
455  @property
456  def dbMonitor(self):
457  return self._dbMonitor
458 
459 
460  # set start times, if not set already
461  def setPreExeStart(self):
462  if self._preExeStart is None:
463  self._preExeStart = os.times()
464  msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465 
466  def setValStart(self):
467  if self._valStart is None:
468  self._valStart = os.times()
469  msg.debug('valStart time is {0}'.format(self._valStart))
470 
471  def preExecute(self, input = set(), output = set()):
472  self.setPreExeStart()
473  msg.info('Preexecute for %s', self._name)
474 
475  def execute(self):
476  self._exeStart = os.times()
477  msg.debug('exeStart time is {0}'.format(self._exeStart))
478  msg.info('Starting execution of %s', self._name)
479  self._hasExecuted = True
480  self._rc = 0
481  self._errMsg = ''
482  msg.info('%s executor returns %d', self._name, self._rc)
483  self._exeStop = os.times()
484  msg.debug('preExeStop time is {0}'.format(self._exeStop))
485 
486  def postExecute(self):
487  msg.info('Postexecute for %s', self._name)
488 
489  def validate(self):
490  self.setValStart()
491  self._hasValidated = True
492  msg.info('Executor %s has no validation function - assuming all ok', self._name)
493  self._isValidated = True
494  self._errMsg = ''
495  self._valStop = os.times()
496  msg.debug('valStop time is {0}'.format(self._valStop))
497 
498 
499  def doAll(self, input=set(), output=set()):
500  self.preExecute(input, output)
501  self.execute()
502  self.postExecute()
503  self.validate()
504 
505 
507  def __init__(self, name = 'Logscan'):
508  super(logscanExecutor, self).__init__(name=name)
509  self._errorMaskFiles = None
510  self._logFileName = None
511 
512  def preExecute(self, input = set(), output = set()):
513  self.setPreExeStart()
514  msg.info('Preexecute for %s', self._name)
515  if 'logfile' in self.conf.argdict:
516  self._logFileName = self.conf.argdict['logfile'].value
517 
518  def validate(self):
519  self.setValStart()
520  msg.info("Starting validation for {0}".format(self._name))
521  if self._logFileName:
522 
524  if 'ignorePatterns' in self.conf.argdict:
525  igPat = self.conf.argdict['ignorePatterns'].value
526  else:
527  igPat = []
528  if 'ignoreFiles' in self.conf.argdict:
529  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
530  elif self._errorMaskFiles is not None:
531  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
532  else:
533  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
534 
535  # Now actually scan my logfile
536  msg.info('Scanning logfile {0} for errors'.format(self._logFileName))
537  self._logScan = trfValidation.athenaLogFileReport(logfile = self._logFileName, ignoreList = ignorePatterns)
538  worstError = self._logScan.worstError()
539 
540  # In general we add the error message to the exit message, but if it's too long then don't do
541  # that and just say look in the jobReport
542  if worstError['firstError']:
543  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
544  if 'CoreDumpSvc' in worstError['firstError']['message']:
545  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
546  elif 'G4Exception' in worstError['firstError']['message']:
547  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
548  else:
549  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
550  else:
551  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
552  else:
553  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
554 
555  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
556  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
557  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
559  self._isValidated = False
560  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
561  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
562  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
563 
564  # Must be ok if we got here!
565  msg.info('Executor {0} has validated successfully'.format(self.name))
566  self._isValidated = True
567  self._errMsg = ''
568 
569  self._valStop = os.times()
570  msg.debug('valStop time is {0}'.format(self._valStop))
571 
572 
574  def __init__(self, name = 'Echo', trf = None):
575 
576  # We are only changing the default name here
577  super(echoExecutor, self).__init__(name=name, trf=trf)
578 
579 
580  def execute(self):
581  self._exeStart = os.times()
582  msg.debug('exeStart time is {0}'.format(self._exeStart))
583  msg.info('Starting execution of %s', self._name)
584  msg.info('Transform argument dictionary now follows:')
585  for k, v in self.conf.argdict.items():
586  print("%s = %s" % (k, v))
587  self._hasExecuted = True
588  self._rc = 0
589  self._errMsg = ''
590  msg.info('%s executor returns %d', self._name, self._rc)
591  self._exeStop = os.times()
592  msg.debug('exeStop time is {0}'.format(self._exeStop))
593 
594 
596  def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
597 
598  # We are only changing the default name here
599  super(dummyExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
600 
601 
602  def execute(self):
603  self._exeStart = os.times()
604  msg.debug('exeStart time is {0}'.format(self._exeStart))
605  msg.info('Starting execution of %s', self._name)
606  for type in self._outData:
607  for k, v in self.conf.argdict.items():
608  if type in k:
609  msg.info('Creating dummy output file: {0}'.format(self.conf.argdict[k].value[0]))
610  open(self.conf.argdict[k].value[0], 'a').close()
611  self._hasExecuted = True
612  self._rc = 0
613  self._errMsg = ''
614  msg.info('%s executor returns %d', self._name, self._rc)
615  self._exeStop = os.times()
616  msg.debug('exeStop time is {0}'.format(self._exeStop))
617 
618 
620  def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData = set(),
621  exe = None, exeArgs = None, memMonitor = True):
622  # Name of the script we want to execute
623  self._exe = exe
624 
625  # With arguments (currently this means paste in the corresponding _argdict entry)
626  self._exeArgs = exeArgs
627 
628  super(scriptExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
629 
630  self._extraMetadata.update({'script' : exe})
631 
632  # Decide if we echo script output to stdout
633  self._echoOutput = False
634 
635  # Can either be written by base class or child
636  self._cmd = None
637 
638  self._memMonitor = memMonitor
639 
640  @property
641  def exe(self):
642  return self._exe
643 
644  @exe.setter
645  def exe(self, value):
646  self._exe = value
647  self._extraMetadata['script'] = value
648 
649  @property
650  def exeArgs(self):
651  return self._exeArgs
652 
653  @exeArgs.setter
654  def exeArgs(self, value):
655  self._exeArgs = value
656 # self._extraMetadata['scriptArgs'] = value
657 
658  def preExecute(self, input = set(), output = set()):
659  self.setPreExeStart()
660  msg.debug('scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
661 
662  self._input = input
663  self._output = output
664 
665 
666  if self._cmd is None:
667  self._buildStandardCommand()
668  msg.info('Will execute script as %s', self._cmd)
669 
670  # Define this here to have it for environment detection messages
671  self._logFileName = "log.{0}".format(self._name)
672 
673 
675  if 'TRF_ECHO' in os.environ:
676  msg.info('TRF_ECHO envvar is set - enabling command echoing to stdout')
677  self._echoOutput = True
678  elif 'TRF_NOECHO' in os.environ:
679  msg.info('TRF_NOECHO envvar is set - disabling command echoing to stdout')
680  self._echoOutput = False
681  # PS1 is for sh, bash; prompt is for tcsh and zsh
682  elif isInteractiveEnv():
683  msg.info('Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
684  self._echoOutput = True
685  elif 'TZHOME' in os.environ:
686  msg.info('Tier-0 environment detected - enabling command echoing to stdout')
687  self._echoOutput = True
688  if self._echoOutput is False:
689  msg.info('Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self._name, self._logFileName))
690 
691  # Now setup special loggers for logging execution messages to stdout and file
692  self._echologger = logging.getLogger(self._name)
693  self._echologger.setLevel(logging.INFO)
694  self._echologger.propagate = False
695 
696  encargs = {'encoding' : 'utf-8'}
697  self._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
698  self._exeLogFile.setFormatter(logging.Formatter('%(asctime)s %(message)s', datefmt='%H:%M:%S'))
699  self._echologger.addHandler(self._exeLogFile)
700 
701  if self._echoOutput:
702  self._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
703  self._echostream.setFormatter(logging.Formatter('%(name)s %(asctime)s %(message)s', datefmt='%H:%M:%S'))
704  self._echologger.addHandler(self._echostream)
705 
707  if self._exe:
708  self._cmd = [self.exe, ]
709  else:
710  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711  'No executor set in {0}'.format(self.__class__.__name__))
712  for arg in self.exeArgs:
713  if arg in self.conf.argdict:
714  # If we have a list then add each element to our list, else just str() the argument value
715  # Note if there are arguments which need more complex transformations then
716  # consider introducing a special toExeArg() method.
717  if isinstance(self.conf.argdict[arg].value, list):
718  self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719  else:
720  self._cmd.append(str(self.conf.argdict[arg].value))
721 
722 
723  def execute(self):
724  self._hasExecuted = True
725  msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726 
727  self._exeStart = os.times()
728  msg.debug('exeStart time is {0}'.format(self._exeStart))
729  if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730  msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731  os.execvp(self._cmd[0], self._cmd)
732 
733  encargs = {'encoding' : 'utf8'}
734  try:
735  # if we are already inside a container, then
736  # must map /srv of the nested container to /srv of the parent container:
737  # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738  if self._alreadyInContainer and self._containerSetup is not None:
739  msg.info("chdir /srv to launch a nested container for the substep")
740  os.chdir("/srv")
741  p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742  # go back to the original working directory immediately after to store prmon in it
743  if self._alreadyInContainer and self._containerSetup is not None:
744  msg.info("chdir {} after launching the nested container".format(self._workdir))
745  os.chdir(self._workdir)
746 
747  if self._memMonitor:
748  try:
749  self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750  self._memFullFile = 'prmon.full.' + self._name
751  memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752  '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753  '--interval', '30']
754  mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755  # TODO - link mem.full.current to mem.full.SUBSTEP
756  except Exception as e:
757  msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758  self._memMonitor = False
759 
760  while p.poll() is None:
761  line = p.stdout.readline()
762  if line:
763  self._echologger.info(line.rstrip())
764  # Hoover up remaining buffered output lines
765  for line in p.stdout:
766  self._echologger.info(line.rstrip())
767 
768  self._rc = p.returncode
769  msg.info('%s executor returns %d', self._name, self._rc)
770  self._exeStop = os.times()
771  msg.debug('exeStop time is {0}'.format(self._exeStop))
772  except OSError as e:
773  errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774  msg.error(errMsg)
775  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776  finally:
777  if self._memMonitor:
778  try:
779  mem_proc.send_signal(signal.SIGUSR1)
780  countWait = 0
781  while (not mem_proc.poll()) and countWait < 10:
782  time.sleep(0.1)
783  countWait += 1
784  except OSError:
785  pass
786 
787 
788  def postExecute(self):
789  if hasattr(self._exeLogFile, 'close'):
790  self._exeLogFile.close()
791  if self._memMonitor:
792  try:
793  memFile = open(self._memSummaryFile)
794  self._memStats = json.load(memFile)
795  except Exception as e:
796  msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e))
797  self._memMonitor = False
798  self._memStats = {}
799 
800 
801  def validate(self):
802  if self._valStart is None:
803  self._valStart = os.times()
804  msg.debug('valStart time is {0}'.format(self._valStart))
805  self._hasValidated = True
806 
807 
808  if self._rc == 0:
809  msg.info('Executor {0} validated successfully (return code {1})'.format(self._name, self._rc))
810  self._isValidated = True
811  self._errMsg = ''
812  else:
813  # Want to learn as much as possible from the non-zero code
814  # this is a bit hard in general, although one can do signals.
815  # Probably need to be more specific per exe, i.e., athena non-zero codes
816  self._isValidated = False
817  if self._rc < 0:
818  # Map return codes to what the shell gives (128 + SIGNUM)
819  self._rc = 128 - self._rc
820  if trfExit.codeToSignalname(self._rc) != "":
821  self._errMsg = '{0} got a {1} signal (exit code {2})'.format(self._name, trfExit.codeToSignalname(self._rc), self._rc)
822  else:
823  self._errMsg = 'Non-zero return code from %s (%d)' % (self._name, self._rc)
824  raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_FAIL'), self._errMsg)
825 
826 
828  if 'checkEventCount' in self.conf.argdict and self.conf.argdict['checkEventCount'].returnMyValue(exe=self) is False:
829  msg.info('Event counting for substep {0} is skipped'.format(self.name))
830  else:
831  if 'mpi' in self.conf.argdict and not mpi.mpiShouldValidate():
832  msg.info('MPI mode -- skipping output event count check')
833  else:
834  checkcount=trfValidation.eventMatch(self)
835  checkcount.decide()
836  self._eventCount = checkcount.eventCount
837  msg.info('Event counting for substep {0} passed'.format(self.name))
838 
839  self._valStop = os.times()
840  msg.debug('valStop time is {0}'.format(self._valStop))
841 
842 
843 
845  _exitMessageLimit = 200 # Maximum error message length to report in the exitMsg
846  _defaultIgnorePatternFile = ['atlas_error_mask.db']
847 
848 
885  def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
886  inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'],
887  substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {},
888  literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None,
889  manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
890 
891  self._substep = forceToAlphaNum(substep)
892  self._inputEventTest = inputEventTest
893  self._tryDropAndReload = tryDropAndReload
894  self._extraRunargs = extraRunargs
895  self._runtimeRunargs = runtimeRunargs
896  self._literalRunargs = literalRunargs
897  self._dataArgs = dataArgs
898  self._errorMaskFiles = errorMaskFiles
899  self._inputDataTypeCountCheck = inputDataTypeCountCheck
900  self._disableMT = disableMT
901  self._disableMP = disableMP
902  self._onlyMP = onlyMP
903  self._onlyMT = onlyMT
904  self._onlyMPWithRunargs = onlyMPWithRunargs
905  self._skeletonCA=skeletonCA
906 
907  if perfMonFile:
908  self._perfMonFile = None
909  msg.debug("Resource monitoring from PerfMon is now deprecated")
910 
911  # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
912  if isinstance(skeletonFile, str):
913  self._skeleton = [skeletonFile]
914  else:
915  self._skeleton = skeletonFile
916 
917  super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918  exeArgs=exeArgs, memMonitor=memMonitor)
919 
920  # Add athena specific metadata
921  self._extraMetadata.update({'substep': substep})
922 
923  # Setup JO templates
924  if self._skeleton or self._skeletonCA:
925  self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
926  else:
927  self._jobOptionsTemplate = None
928 
929  @property
931  return self._inputDataTypeCountCheck
932 
933  @inputDataTypeCountCheck.setter
934  def inputDataTypeCountCheck(self, value):
935  self._inputDataTypeCountCheck = value
936 
937  @property
938  def substep(self):
939  return self._substep
940 
941  @property
942  def disableMP(self):
943  return self._disableMP
944 
945  @disableMP.setter
946  def disableMP(self, value):
947  self._disableMP = value
948 
949  @property
950  def disableMT(self):
951  return self._disableMT
952 
953  @disableMT.setter
954  def disableMT(self, value):
955  self._disableMT = value
956 
957  @property
958  def onlyMP(self):
959  return self._onlyMP
960 
961  @onlyMP.setter
962  def onlyMP(self, value):
963  self._onlyMP = value
964 
965  @property
966  def onlyMT(self):
967  return self._onlyMT
968 
969  @onlyMT.setter
970  def onlyMT(self, value):
971  self._onlyMT = value
972 
973  def preExecute(self, input = set(), output = set()):
974  self.setPreExeStart()
975  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
976 
977  # Check we actually have events to process!
978  inputEvents = 0
979  dt = ""
980  if self._inputDataTypeCountCheck is None:
981  self._inputDataTypeCountCheck = input
982  for dataType in self._inputDataTypeCountCheck:
983  if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
984  continue
985 
986  thisInputEvents = self.conf.dataDictionary[dataType].nentries
987  if thisInputEvents > inputEvents:
988  inputEvents = thisInputEvents
989  dt = dataType
990 
991  # Now take into account skipEvents and maxEvents
992  if ('skipEvents' in self.conf.argdict and
993  self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
994  mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
995  else:
996  mySkipEvents = 0
997 
998  if ('maxEvents' in self.conf.argdict and
999  self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000  myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001  else:
1002  myMaxEvents = -1
1003 
1004  # Any events to process...?
1005  if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1006  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1007  'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1008 
1009  try:
1010  # Expected events to process
1011  if (myMaxEvents != -1):
1012  if (self.inData and next(iter(self.inData)) == 'inNULL'):
1013  expectedEvents = myMaxEvents
1014  else:
1015  expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1016  else:
1017  expectedEvents = inputEvents-mySkipEvents
1018  except TypeError:
1019  # catching type error from UNDEFINED inputEvents count
1020  msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1021  expectedEvents = 0
1022 
1023 
1026  OSSetupString = None
1027 
1028  # Extract the asetup string
1029  asetupString = None
1030  legacyThreadingRelease = False
1031  if 'asetup' in self.conf.argdict:
1032  asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1033  legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1034  else:
1035  msg.info('Asetup report: {0}'.format(asetupReport()))
1036 
1037  if asetupString is not None:
1038  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1039  currentOS = os.environ['ALRB_USER_PLATFORM']
1040  if legacyOSRelease and "centos7" not in currentOS:
1041  OSSetupString = "centos7"
1042  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1043 
1044 
1045  # allow overriding the container OS using a flag
1046  if 'runInContainer' in self.conf.argdict:
1047  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1048  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1049  if OSSetupString is not None and asetupString is None:
1050  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1051  '--asetup must be used for the substep which requires --runInContainer')
1052 
1053  # Conditional MP based on runtime arguments
1054  if self._onlyMPWithRunargs:
1055  for k in self._onlyMPWithRunargs:
1056  if k in self.conf._argdict:
1057  self._onlyMP = True
1058 
1059  # Check the consistency of parallel configuration: CLI flags + evnironment.
1060  if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1061  ('ATHENA_CORE_NUMBER' not in os.environ)):
1062  # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1063  msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1064  else:
1065  # Try to detect AthenaMT mode, number of threads and number of concurrent events
1066  if not self._disableMT:
1067  self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1068 
1069  # Try to detect AthenaMP mode and number of workers
1070  if not self._disableMP:
1071  self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1072 
1073  # Check that we actually support MT
1074  if self._onlyMP and self._athenaMT > 0:
1075  msg.info("This configuration does not support MT, falling back to MP")
1076  if self._athenaMP == 0:
1077  self._athenaMP = self._athenaMT
1078  self._athenaMT = 0
1079  self._athenaConcurrentEvents = 0
1080 
1081  # Check that we actually support MP
1082  if self._onlyMT and self._athenaMP > 0:
1083  msg.info("This configuration does not support MP, using MT")
1084  if self._athenaMT == 0:
1085  self._athenaMT = self._athenaMP
1087  self._athenaMP = 0
1088 
1089  # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1090  # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1091  if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1092  msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1093  self._disableMP = True
1094  self._athenaMP = 0
1095 
1096  # Handle executor steps
1097  if self.conf.totalExecutorSteps > 1:
1098  for dataType in output:
1099  if self.conf._dataDictionary[dataType].originalName:
1100  self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1101  else:
1102  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1103  self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1104  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1105 
1106  # And if this is (still) athenaMP, then set some options for workers and output file report
1107  if self._athenaMP > 0:
1108  self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1109  self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1110  self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1111  if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1113  else:
1114  self._athenaMPReadEventOrders = False
1115  # Decide on scheduling
1116  if ('athenaMPStrategy' in self.conf.argdict and
1117  (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1118  self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1119  else:
1120  self._athenaMPStrategy = 'SharedQueue'
1121 
1122  # See if we have options for the target output file size
1123  if 'athenaMPMergeTargetSize' in self.conf.argdict:
1124  for dataType in output:
1125  if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1126  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1127  msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1128  else:
1129  # Use a globbing strategy
1130  matchedViaGlob = False
1131  for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1132  if fnmatch(dataType, mtsType):
1133  self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1134  msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135  matchedViaGlob = True
1136  break
1137  if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1138  self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1139  msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1140 
1141  # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1142  # so that the mother process output file (if it exists) can be used directly
1143  # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1144  for dataType in output:
1145  if self.conf.totalExecutorSteps <= 1:
1146  self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1147  if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1148  if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1149  msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1150  else:
1151  self.conf._dataDictionary[dataType].value[0] += "_000"
1152  msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1153  else:
1154  self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1155 
1156 
1157  if 'mpi' in self.conf.argdict:
1158  msg.info("Running in MPI mode")
1159  mpi.setupMPIConfig(output, self.conf.dataDictionary)
1160 
1161 
1162  if self._skeleton or self._skeletonCA:
1163  inputFiles = dict()
1164  for dataType in input:
1165  inputFiles[dataType] = self.conf.dataDictionary[dataType]
1166  outputFiles = dict()
1167  for dataType in output:
1168  outputFiles[dataType] = self.conf.dataDictionary[dataType]
1169 
1170  # See if we have any 'extra' file arguments
1171  nameForFiles = commonExecutorStepName(self._name)
1172  for dataType, dataArg in self.conf.dataDictionary.items():
1173  if isinstance(dataArg, list) and dataArg:
1174  if self.conf.totalExecutorSteps <= 1:
1175  raise ValueError('Multiple input arguments provided but only running one substep')
1176  if self.conf.totalExecutorSteps != len(dataArg):
1177  raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1178 
1179  if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1180  inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1181  else:
1182  if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1183  inputFiles[dataArg.subtype] = dataArg
1184 
1185  msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1186 
1187  # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1188  self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1189  output = outputFiles)
1190 
1191 
1193  if len(input) > 0:
1194  self._extraMetadata['inputs'] = list(input)
1195  if len(output) > 0:
1196  self._extraMetadata['outputs'] = list(output)
1197 
1198 
1199  dbrelease = dbsetup = None
1200  if 'DBRelease' in self.conf.argdict:
1201  dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1202  if path.islink(dbrelease):
1203  dbrelease = path.realpath(dbrelease)
1204  if dbrelease:
1205  # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1206  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1207  if dbdMatch:
1208  msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1209  if not os.access(dbrelease, os.R_OK):
1210  msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1211  msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1212  dbrelease = dbdMatch.group(1)
1213  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1214  else:
1215  # Check if the DBRelease is setup
1216  msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1217  unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1218  if unpacked:
1219  # Now run the setup.py script to customise the paths to the current location...
1220  setupDBRelease(dbsetup)
1221  # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1222  else:
1223  dbsetup = cvmfsDBReleaseCheck(dbrelease)
1224 
1225 
1227  self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1228  self._prepAthenaCommandLine()
1229 
1230 
1231  super(athenaExecutor, self).preExecute(input, output)
1232 
1233  # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1234  # This will have asetup and/or DB release setups in it
1235  # Do this last in this preExecute as the _cmd needs to be finalised
1236  msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1237  self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1238  msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1239 
1240 
1241  def postExecute(self):
1242  super(athenaExecutor, self).postExecute()
1243  # MPI merging
1244  if 'mpi' in self.conf.argdict:
1245  mpi.mergeOutputs()
1246 
1247  # Handle executor substeps
1248  if self.conf.totalExecutorSteps > 1:
1249  if self._athenaMP > 0:
1250  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1251  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1252  if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1253  # first loop over datasets for the output
1254  for dataType in self._output:
1255  newValue = []
1256  if self._athenaMP > 0:
1257  # assume the same number of workers all the time
1258  for i in range(self.conf.totalExecutorSteps):
1259  for v in self.conf.dataDictionary[dataType].value:
1260  newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1261  '_{0}{1}_'.format(executorStepSuffix, i)))
1262  else:
1263  self.conf.dataDictionary[dataType].multipleOK = True
1264  # just combine all executors
1265  for i in range(self.conf.totalExecutorSteps):
1266  newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1267  self.conf.dataDictionary[dataType].value = newValue
1268 
1269  # do the merging if needed
1270  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1271  self._smartMerge(self.conf.dataDictionary[dataType])
1272 
1273  # If this was an athenaMP run then we need to update output files
1274  elif self._athenaMP > 0:
1275  outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1276 
1277  skipFileChecks=False
1278  if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1279  skipFileChecks=True
1280  athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1281  for dataType in self._output:
1282  if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1283  self._smartMerge(self.conf.dataDictionary[dataType])
1284 
1285  if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1286  self._targzipJiveXML()
1287 
1288  # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1289  # This is a bit ugly to have such a specific feature here though
1290  # TODO
1291  # The best is to have a general approach so that user can extract useful info from log
1292  # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1293  # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1294  # and use it to extract required info and do the summation during log scan.
1295  if self._logFileName=='log.ReSim' and self.name=='ReSim':
1296  msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1298 
1299  # Remove intermediate input/output files of sub-steps
1300  # Delete only files with io="temporay" which are files with pattern "tmp*"
1301  # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1302  # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1303  # Enable if --deleteIntermediateOutputfiles is set
1304  if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1305  inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1306 
1307  for k, v in inputDataDictionary.items():
1308  if not v.io == 'temporary':
1309  continue
1310  for filename in v.value:
1311  if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1312  msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1313  # Check if symbolic link and delete also linked file
1314  if (os.path.realpath(filename) != filename):
1315  targetpath = os.path.realpath(filename)
1316  os.unlink(filename)
1317  if (targetpath) and os.access(targetpath, os.R_OK):
1318  os.unlink(targetpath)
1319 
1320 
1321  def validate(self):
1322  self.setValStart()
1323  self._hasValidated = True
1324  deferredException = None
1325  memLeakThreshold = 5000
1326  _hasMemLeak = False
1327 
1328 
1329  try:
1330  super(athenaExecutor, self).validate()
1332  # In this case we hold this exception until the logfile has been scanned
1333  msg.error('Validation of return code failed: {0!s}'.format(e))
1334  deferredException = e
1335 
1336 
1343  if self._memFullFile:
1344  msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1345  self._memLeakResult = analytic().getFittedData(self._memFullFile)
1346  if self._memLeakResult:
1347  if self._memLeakResult['slope'] > memLeakThreshold:
1348  _hasMemLeak = True
1349  msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1350  else:
1351  msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1352  else:
1353  msg.info('No memory monitor file to be analysed')
1354 
1355  # Logfile scan setup
1356  # Always use ignorePatterns from the command line
1357  # For patterns in files, pefer the command line first, then any special settings for
1358  # this executor, then fallback to the standard default (atlas_error_mask.db)
1359  if 'ignorePatterns' in self.conf.argdict:
1360  igPat = self.conf.argdict['ignorePatterns'].value
1361  else:
1362  igPat = []
1363  if 'ignoreFiles' in self.conf.argdict:
1364  ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1365  elif self._errorMaskFiles is not None:
1366  ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1367  else:
1368  ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1369 
1370  # Now actually scan my logfile
1371  msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1372  self._logScan = trfValidation.athenaLogFileReport(logfile=self._logFileName, substepName=self._substep,
1373  ignoreList=ignorePatterns)
1374  worstError = self._logScan.worstError()
1375  eventLoopWarnings = self._logScan.eventLoopWarnings()
1377 
1378 
1379  # In general we add the error message to the exit message, but if it's too long then don't do
1380  # that and just say look in the jobReport
1381  if worstError['firstError']:
1382  if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1383  if 'CoreDumpSvc' in worstError['firstError']['message']:
1384  exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1385  elif 'G4Exception' in worstError['firstError']['message']:
1386  exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1387  else:
1388  exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1389  else:
1390  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1391  else:
1392  exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1393 
1394  # If we failed on the rc, then abort now
1395  if deferredException is not None:
1396  # Add any logfile information we have
1397  if worstError['nLevel'] >= stdLogLevels['ERROR']:
1398  deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1399  # Add the result of memory analysis
1400  if _hasMemLeak:
1401  deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1402  raise deferredException
1403 
1404 
1405  # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1406  if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1407  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1408  # Act as if ignoreErrors=True if running in MPI, because we want to be tolerant to the occasional event failure
1409  elif worstError['nLevel'] >= stdLogLevels['ERROR'] and (not mpi.mpiShouldValidate()):
1410  msg.warning(f'Found {worstError['level']} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1411  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1412  self._isValidated = False
1413  msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1414  # Add the result of memory analysis
1415  if _hasMemLeak:
1416  exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1417  raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1418  'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1419 
1420  # Print event loop warnings
1421  if (len(eventLoopWarnings) > 0):
1422  msg.warning('Found WARNINGS in the event loop, as follows:')
1423  for element in eventLoopWarnings:
1424  msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1425 
1426  # Must be ok if we got here!
1427  msg.info('Executor {0} has validated successfully'.format(self.name))
1428  self._isValidated = True
1429 
1430  self._valStop = os.times()
1431  msg.debug('valStop time is {0}'.format(self._valStop))
1432 
1433 
1434  def _isCAEnabled(self):
1435  # CA not present
1436  if 'CA' not in self.conf.argdict:
1437  # If there is no legacy skeleton, then we are running with CA
1438  if not self._skeleton:
1439  return True
1440  else:
1441  return False
1442 
1443  # CA present but None, all substeps running with CA
1444  if self.conf.argdict['CA'] is None:
1445  return True
1446 
1447  # CA enabled for a substep, running with CA
1448  if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1449  return True
1450 
1451  return False
1452 
1453 
1455 
1458  if 'athena' in self.conf.argdict:
1459  self._exe = self.conf.argdict['athena'].value
1460  self._cmd = [self._exe]
1461 
1462  # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1463  currentSubstep = None
1464  if 'athenaopts' in self.conf.argdict:
1465  currentName = commonExecutorStepName(self.name)
1466  if currentName in self.conf.argdict['athenaopts'].value:
1467  currentSubstep = currentName
1468  if self.substep in self.conf.argdict['athenaopts'].value:
1469  msg.info('Athenaopts found for {0} and {1}, joining options. '
1470  'Consider changing your configuration to use just the name or the alias of the substep.'
1471  .format(currentSubstep, self.substep))
1472  self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1473  del self.conf.argdict['athenaopts'].value[self.substep]
1474  msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1475  elif self.substep in self.conf.argdict['athenaopts'].value:
1476  currentSubstep = self.substep
1477  elif 'all' in self.conf.argdict['athenaopts'].value:
1478  currentSubstep = 'all'
1479 
1480  # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1481  preLoadUpdated = dict()
1482  if 'LD_PRELOAD' in self._envUpdate._envdict:
1483  preLoadUpdated[currentSubstep] = False
1484  if 'athenaopts' in self.conf.argdict:
1485  if currentSubstep is not None:
1486  for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1487  # This code is pretty ugly as the athenaopts argument contains
1488  # strings which are really key/value pairs
1489  if athArg.startswith('--preloadlib'):
1490  try:
1491  i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1492  v = athArg.split('=', 1)[1]
1493  msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1494  newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1495  self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1496  except Exception as e:
1497  msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1498  preLoadUpdated[currentSubstep] = True
1499  break
1500  if not preLoadUpdated[currentSubstep]:
1501  msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1502  if 'athenaopts' in self.conf.argdict:
1503  if currentSubstep is not None:
1504  self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1505  else:
1506  self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1507  else:
1508  self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1509 
1510  # Now update command line with the options we have (including any changes to preload)
1511  if 'athenaopts' in self.conf.argdict:
1512  if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1513  self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1514  elif currentSubstep in self.conf.argdict['athenaopts'].value:
1515  self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1516 
1517  if currentSubstep is None:
1518  currentSubstep = 'all'
1519 
1520  if self._tryDropAndReload:
1521  if self._isCAEnabled():
1522  msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1523  elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1524  msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1525  elif 'athenaopts' in self.conf.argdict:
1526  athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1527  # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1528  if currentSubstep in self.conf.argdict['athenaopts'].value:
1529  conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1530  if len(conflictOpts) > 0:
1531  msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1532  else:
1533  msg.info('Appending "--drop-and-reload" to athena options')
1534  self._cmd.append('--drop-and-reload')
1535  else:
1536  msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1537  self._cmd.append('--drop-and-reload')
1538  else:
1539  # This is the 'standard' case - so drop and reload should be ok
1540  msg.info('Appending "--drop-and-reload" to athena options')
1541  self._cmd.append('--drop-and-reload')
1542  else:
1543  msg.info('Skipping test for "--drop-and-reload" in this executor')
1544 
1545  if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1546  # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1547  if self._athenaMT > 0 and not self._disableMT:
1548  if not ('athenaopts' in self.conf.argdict and
1549  any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1550  self._cmd.append('--threads=%s' % str(self._athenaMT))
1551 
1552  # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1553  if self._athenaMP > 0 and not self._disableMP:
1554  if not ('athenaopts' in self.conf.argdict and
1555  any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1556  self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1557 
1558  # Add topoptions
1559  # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1560  if self._skeleton or self._skeletonCA:
1561  self._cmd += self._topOptionsFiles
1562  msg.info('Updated script arguments with topoptions: %s', self._cmd)
1563 
1564 
1565 
1567  self,
1568  asetup = None,
1569  dbsetup = None,
1570  ossetup = None
1571  ):
1572  self._originalCmd = self._cmd
1573  self._asetup = asetup
1574  self._dbsetup = dbsetup
1575  self._containerSetup = ossetup
1576  self._workdir = os.getcwd()
1577  self._alreadyInContainer = self._workdir.startswith("/srv")
1578  self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1579  self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1580 
1581  # Create a setupATLAS script
1582  setupATLAS = 'my_setupATLAS.sh'
1583  with open(setupATLAS, 'w') as f:
1584  print("#!/bin/bash", file=f)
1585  print("""
1586 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1587  export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1588 fi
1589 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1590  , file=f)
1591  os.chmod(setupATLAS, 0o755)
1592 
1593  msg.debug(
1594  'Preparing wrapper file {wrapperFileName} with '
1595  'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1596  wrapperFileName = self._wrapperFile,
1597  asetupStatus = self._asetup,
1598  dbsetupStatus = self._dbsetup
1599  )
1600  )
1601 
1602  container_cmd = None
1603  try:
1604  with open(self._wrapperFile, 'w') as wrapper:
1605  print('#!/bin/sh', file=wrapper)
1606  if self._containerSetup is not None:
1607  container_cmd = [ os.path.abspath(setupATLAS),
1608  "-c",
1609  self._containerSetup,
1610  "--pwd",
1611  self._workdir,
1612  "-s",
1613  os.path.join('.', self._setupFile),
1614  "-r"]
1615  print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1616  print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1617  print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1618  file = wrapper)
1619  print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1620  file=wrapper)
1621  print('echo', file=wrapper)
1622 
1623  if asetup:
1624  wfile = wrapper
1625  asetupFile = None
1626  # if the substep is executed within a container, setup athena with a separate script
1627  # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1628  if self._containerSetup is not None:
1629  asetupFile = open(self._setupFile, 'w')
1630  wfile = asetupFile
1631  print(f'source ./{setupATLAS} -q', file=wfile)
1632  print(f'asetup {asetup}', file=wfile)
1633  print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1634  if dbsetup:
1635  dbroot = path.dirname(dbsetup)
1636  dbversion = path.basename(dbroot)
1637  print("# DBRelease setup", file=wrapper)
1638  print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1639  print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1640  print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1641  print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1642  print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1643  print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1644  if self._disableMT:
1645  print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1646  if self._disableMP:
1647  print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1648  if self._envUpdate.len > 0:
1649  for envSetting in self._envUpdate.values:
1650  if not envSetting.startswith('LD_PRELOAD'):
1651  print("export", envSetting, file=wrapper)
1652  # If Valgrind is engaged, a serialised Athena configuration file
1653  # is generated for use with a subsequent run of Athena with
1654  # Valgrind.
1655  if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1656  msg.info('Valgrind engaged')
1657  # Define the file name of the serialised Athena
1658  # configuration.
1659  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1660  name = self._name
1661  )
1662  # Run Athena for generation of its serialised configuration.
1663  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1664  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1665  # Generate a Valgrind command, suppressing or ussing default
1666  # options as requested and extra options as requested.
1667  if 'valgrindDefaultOpts' in self.conf._argdict:
1668  defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1669  else:
1670  defaultOptions = True
1671  if 'valgrindExtraOpts' in self.conf._argdict:
1672  extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1673  else:
1674  extraOptionsList = None
1675  msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1676  msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1677  command = ValgrindCommand(
1678  defaultOptions = defaultOptions,
1679  extraOptionsList = extraOptionsList,
1680  AthenaSerialisedConfigurationFile = \
1681  AthenaSerialisedConfigurationFile
1682  )
1683  msg.debug("Valgrind command: {command}".format(command = command))
1684  print(command, file=wrapper)
1685  # If VTune is engaged, a serialised Athena configuration file
1686  # is generated for use with a subsequent run of Athena with
1687  # VTune.
1688  elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1689  msg.info('VTune engaged')
1690  # Define the file name of the serialised Athena
1691  # configuration.
1692  AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1693  name = self._name
1694  )
1695  # Run Athena for generation of its serialised configuration.
1696  print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1697  print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1698  # Generate a VTune command, suppressing or ussing default
1699  # options as requested and extra options as requested.
1700  if 'vtuneDefaultOpts' in self.conf._argdict:
1701  defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1702  else:
1703  defaultOptions = True
1704  if 'vtuneExtraOpts' in self.conf._argdict:
1705  extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1706  else:
1707  extraOptionsList = None
1708 
1709  # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1710  if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1711  AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1712  else:
1713  AthenaCommand = self._cmd
1714  AthenaCommand.append(AthenaSerialisedConfigurationFile)
1715 
1716  msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1717  msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1718  command = VTuneCommand(
1719  defaultOptions = defaultOptions,
1720  extraOptionsList = extraOptionsList,
1721  AthenaCommand = AthenaCommand
1722  )
1723  msg.debug("VTune command: {command}".format(command = command))
1724  print(command, file=wrapper)
1725  else:
1726  msg.info('Valgrind/VTune not engaged')
1727  # run Athena command
1728  print(' '.join(self._cmd), file=wrapper)
1729  os.chmod(self._wrapperFile, 0o755)
1730  except OSError as e:
1731  errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1732  fileName = self._wrapperFile,
1733  error = e
1734  )
1735  msg.error(errMsg)
1737  trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1738  errMsg
1739  )
1740  self._cmd = [ path.join('.', self._wrapperFile) ]
1741  if self._containerSetup is not None:
1742  asetupFile.close()
1743  self._cmd = container_cmd + self._cmd
1744 
1745 
1746 
1748  def _smartMerge(self, fileArg):
1749 
1750  if 'selfMerge' not in dir(fileArg):
1751  msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1752  return
1753 
1754  if fileArg.mergeTargetSize == 0:
1755  msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1756  return
1757 
1758 
1759  mergeCandidates = [list()]
1760  currentMergeSize = 0
1761  for fname in fileArg.value:
1762  size = fileArg.getSingleMetadata(fname, 'file_size')
1763  if not isinstance(size, int):
1764  msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1765  return
1766  # if there is no file in the job, then we must add it
1767  if len(mergeCandidates[-1]) == 0:
1768  msg.debug('Adding file {0} to current empty merge list'.format(fname))
1769  mergeCandidates[-1].append(fname)
1770  currentMergeSize += size
1771  continue
1772  # see if adding this file gets us closer to the target size (but always add if target size is negative)
1773  if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1774  msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1775  mergeCandidates[-1].append(fname)
1776  currentMergeSize += size
1777  continue
1778  # close this merge list and start a new one
1779  msg.debug('Starting a new merge list with file {0}'.format(fname))
1780  mergeCandidates.append([fname])
1781  currentMergeSize = size
1782 
1783  msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1784 
1785  if len(mergeCandidates) == 1:
1786  # Merging to a single file, so use the original filename that the transform
1787  # was started with
1788  mergeNames = [fileArg.originalName]
1789  else:
1790  # Multiple merge targets, so we need a set of unique names
1791  counter = 0
1792  mergeNames = []
1793  for mergeGroup in mergeCandidates:
1794  # Note that the individual worker files get numbered with 3 digit padding,
1795  # so these non-padded merges should be fine
1796  mergeName = fileArg.originalName + '_{0}'.format(counter)
1797  while path.exists(mergeName):
1798  counter += 1
1799  mergeName = fileArg.originalName + '_{0}'.format(counter)
1800  mergeNames.append(mergeName)
1801  counter += 1
1802  # Now actually do the merges
1803  for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1804  msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1805  if len(mergeGroup) <= 1:
1806  msg.info('Skip merging for single file')
1807  else:
1808 
1809  self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1810 
1811 
1812  def _targzipJiveXML(self):
1813  #tgzipping JiveXML files
1814  targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1815  if os.path.exists(targetTGZName):
1816  os.remove(targetTGZName)
1817 
1818  import tarfile
1819  fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1820 
1821  # force gz compression
1822  tar = tarfile.open(targetTGZName, "w:gz")
1823  for fName in os.listdir('.'):
1824  matches = fNameRE.findall(fName)
1825  if len(matches) > 0:
1826  if fNameRE.findall(fName)[0] == fName:
1827  msg.info('adding %s to %s', fName, targetTGZName)
1828  tar.add(fName)
1829 
1830  tar.close()
1831  msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1832 
1833 
1834 
1836 
1837  # Here we validate, but will suppress any errors
1838  def validate(self):
1839  self.setValStart()
1840  try:
1841  super(optionalAthenaExecutor, self).validate()
1843  # In this case we hold this exception until the logfile has been scanned
1844  msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1845  self._isValidated = False
1846  self._errMsg = e.errMsg
1847  self._rc = e.errCode
1848  self._valStop = os.times()
1849  msg.debug('valStop time is {0}'.format(self._valStop))
1850 
1851 
1853 
1864  def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1865  inData = set(), outData = set(), exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True,
1866  perfMonFile = None, tryDropAndReload = True, extraRunargs = {},
1867  manualDataDictionary = None, memMonitor = True):
1868 
1869  super(POOLMergeExecutor, self).__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1870  inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1871  inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1872  tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1873  manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1874 
1875  def preExecute(self, input = set(), output = set()):
1876  self.setPreExeStart()
1877  super(POOLMergeExecutor, self).preExecute(input=input, output=output)
1878 
1879 
1880  def execute(self):
1881  # First call the parent executor, which will manage the athena execution for us
1882  super(POOLMergeExecutor, self).execute()
1883 
1884 
1885 
1888 
1890  def preExecute(self, input=set(), output=set()):
1891  self.setPreExeStart()
1892  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1893  if 'NTUP_PILEUP' not in output:
1894  # New derivation framework transform uses "formats"
1895  if 'formats' not in self.conf.argdict:
1896  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1897  'No derivation configuration specified')
1898 
1899  if ('DAOD' not in output) and ('D2AOD' not in output):
1900  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1901  'No base name for DAOD output')
1902 
1903  formatList = []
1904  if 'formats' in self.conf.argdict: formatList = self.conf.argdict['formats'].value
1905  for reduction in formatList:
1906  if ('DAOD' in output):
1907  dataType = 'DAOD_' + reduction
1908  if 'augmentations' not in self.conf.argdict:
1909  outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1910  else:
1911  for val in self.conf.argdict['augmentations'].value:
1912  if reduction in val.split(':')[0]:
1913  outputName = 'DAOD_' + val.split(':')[1] + '.' + self.conf.argdict['outputDAODFile'].value[0]
1914  break
1915  else:
1916  outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1917 
1918  if ('D2AOD' in output):
1919  dataType = 'D2AOD_' + reduction
1920  outputName = 'D2AOD_' + reduction + '.' + self.conf.argdict['outputD2AODFile'].value[0]
1921 
1922  msg.info('Adding reduction output type {0}'.format(dataType))
1923  output.add(dataType)
1924  newReduction = trfArgClasses.argPOOLFile(outputName, io='output', runarg=True, type='AOD',
1925  name=reduction)
1926  # References to _trf - can this be removed?
1927  self.conf.dataDictionary[dataType] = newReduction
1928 
1929  # Clean up the stub file from the executor input and the transform's data dictionary
1930  # (we don't remove the actual argFile instance)
1931  if ('DAOD' in output):
1932  output.remove('DAOD')
1933  del self.conf.dataDictionary['DAOD']
1934  del self.conf.argdict['outputDAODFile']
1935  if ('D2AOD' in output):
1936  output.remove('D2AOD')
1937  del self.conf.dataDictionary['D2AOD']
1938  del self.conf.argdict['outputD2AODFile']
1939 
1940  msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1941  msg.info('Input/Output: {0}/{1}'.format(input, output))
1942 
1943  msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1944  msg.info('Input/Output: {0}/{1}'.format(input, output))
1945  super(reductionFrameworkExecutor, self).preExecute(input, output)
1946 
1947 
1948 
1950  def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']),
1951  exe='DQHistogramMerge.py', exeArgs = [], memMonitor = True):
1953  self._histMergeList = 'HISTMergeList.txt'
1954 
1955  super(DQMergeExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1956  exeArgs=exeArgs, memMonitor=memMonitor)
1957 
1958 
1959  def preExecute(self, input = set(), output = set()):
1960  self.setPreExeStart()
1961  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1962 
1963  super(DQMergeExecutor, self).preExecute(input=input, output=output)
1964 
1965  # Write the list of files to be merged
1966  with open(self._histMergeList, 'w') as DQMergeFile:
1967  for dataType in input:
1968  for fname in self.conf.dataDictionary[dataType].value:
1969  self.conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1970  print(fname, file=DQMergeFile)
1971 
1972  self._cmd.append(self._histMergeList)
1973 
1974  # Add the output file
1975  if len(output) != 1:
1976  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1977  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
1978  outDataType = list(output)[0]
1979  self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
1980 
1981  # Set the run_post_processing to True/False
1982  if (self.conf._argdict.get("run_post_processing",False)):
1983  self._cmd.append('True')
1984  else:
1985  self._cmd.append('False')
1986 
1987  if (self.conf._argdict.get("is_incremental_merge",False)):
1988  self._cmd.append('True')
1989  else:
1990  self._cmd.append('False')
1991 
1992  for k in ("excludeHist","excludeDir"):
1993  if k in self.conf._argdict:
1994  self._cmd.append("--{0}={1}".format(k,self.conf._argdict[k]))
1995 
1996 
1997  def validate(self):
1998  self.setValStart()
1999  super(DQMergeExecutor, self).validate()
2000 
2001  exitErrorMessage = ''
2002  # Base class validation successful, Now scan the logfile for missed errors.
2003  try:
2005  worstError = logScan.worstError()
2006 
2007  # In general we add the error message to the exit message, but if it's too long then don't do
2008  # that and just say look in the jobReport
2009  if worstError['firstError']:
2010  if len(worstError['firstError']['message']) > logScan._msgLimit:
2011  exitErrorMessage = "Long {0} message at line {1}" \
2012  " (see jobReport for further details)".format(worstError['level'],
2013  worstError['firstError']['firstLine'])
2014  else:
2015  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2016  worstError['firstError']['message'])
2017  except OSError as e:
2018  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2020  'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2021 
2022  if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2023  'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2024  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2025 
2026  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2027  self._isValidated = False
2028  msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2029  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2030  raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2031 
2032  # Must be ok if we got here!
2033  msg.info('Executor {0} has validated successfully'.format(self.name))
2034  self._isValidated = True
2035 
2036  self._valStop = os.times()
2037  msg.debug('valStop time is {0}'.format(self._valStop))
2038 
2039 
2041  def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']),
2042  exe='DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor = True):
2044  self._histMergeList = 'HISTMergeList.txt'
2045 
2046  super(DQMPostProcessExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2047  exeArgs=exeArgs, memMonitor=memMonitor)
2048 
2049 
2050  def preExecute(self, input = set(), output = set()):
2051  self.setPreExeStart()
2052  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2053 
2054  super(DQMPostProcessExecutor, self).preExecute(input=input, output=output)
2055 
2056  #build input file list (typically only one):
2057  dsName=self.conf.argdict["inputHISTFile"].dataset
2058  inputList=[]
2059  for dataType in input:
2060  for fname in self.conf.dataDictionary[dataType].value:
2061  #if no dataset name is give, guess it from file name
2062  if not dsName: dsName=".".join(fname.split('.')[0:4])
2063  inputList.append("#".join([dsName,fname]))
2064 
2065 
2066  if len(output) != 1:
2067  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2068  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2069  outDataType = list(output)[0]
2070  #build argument json:
2071  #T0 uses keys: 'allowCOOLUpload', 'doWebDisplay', 'incrementalMode', 'inputHistFiles', 'mergeParams', 'outputHistFile', 'postProcessing', 'reverseFileOrder', 'servers'
2072  #more keys: runNumber, streamName,projectTag,filepaths,productionMode,skipMerge
2073  wrapperParams={"inputHistFiles" : inputList,
2074  "outputHistFile" : dsName+"#"+self.conf.dataDictionary[outDataType].value[0],
2075  "incrementalMode": "True" if self.conf._argdict.get("is_incremental_merge",False) else "False",
2076  "postProcessing" : "True" if self.conf._argdict.get("run_post_processing",False) else "False",
2077  "doWebDisplay" : "True" if self.conf._argdict.get("doWebDisplay",False) else "False",
2078  "allowCOOLUpload": "True" if self.conf._argdict.get("allowCOOLUpload",False) else "False",
2079  "mergeParams" : "",
2080 
2081  }
2082  if "servers" in self.conf._argdict:
2083  wrapperParams["server"]=self.conf._argdict["servers"]
2084 
2085  for k in ("excludeHist","excludeDir"):
2086  if k in self.conf._argdict:
2087  wrapperParams["mergeParams"]+=(" --{0}={1}".format(k,self.conf._argdict[k]))
2088 
2089 
2090  with open("args.json", "w") as f:
2091  json.dump(wrapperParams, f)
2092 
2093  self._cmd.append("--argJSON=args.json")
2094 
2095 
2096 
2097  def validate(self):
2098  self.setValStart()
2099  super(DQMPostProcessExecutor, self).validate()
2100 
2101  exitErrorMessage = ''
2102  # Base class validation successful, Now scan the logfile for missed errors.
2103  try:
2105  worstError = logScan.worstError()
2106 
2107  # In general we add the error message to the exit message, but if it's too long then don't do
2108  # that and just say look in the jobReport
2109  if worstError['firstError']:
2110  if len(worstError['firstError']['message']) > logScan._msgLimit:
2111  exitErrorMessage = "Long {0} message at line {1}" \
2112  " (see jobReport for further details)".format(worstError['level'],
2113  worstError['firstError']['firstLine'])
2114  else:
2115  exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2116  worstError['firstError']['message'])
2117  except OSError as e:
2118  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2120  'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2121 
2122  if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2123  'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2124  msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2125 
2126  elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2127  self._isValidated = False
2128  msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2129  exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2130  raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2131 
2132  # Must be ok if we got here!
2133  msg.info('Executor {0} has validated successfully'.format(self.name))
2134  self._isValidated = True
2135 
2136  self._valStop = os.times()
2137  msg.debug('valStop time is {0}'.format(self._valStop))
2138 
2140 
2141  def preExecute(self, input = set(), output = set()):
2142  self.setPreExeStart()
2143  msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2144 
2145  # Basic command, and allow overwrite of the output file
2146  if self._exe is None:
2147  self._exe = 'hadd'
2148  self._cmd = [self._exe, "-f"]
2149 
2150 
2151  # Add the output file
2152  if len(output) != 1:
2153  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2154  'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2155  outDataType = list(output)[0]
2156  self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
2157  # Add to be merged to the cmd chain
2158  for dataType in input:
2159  self._cmd.extend(self.conf.dataDictionary[dataType].value)
2160 
2161  super(NTUPMergeExecutor, self).preExecute(input=input, output=output)
2162 
2163 
2165  """Executor for running physvalPostProcessing.py with <input> <output> args"""
2166 
2167  def preExecute(self, input = set(), output = set()):
2168  self.setPreExeStart()
2169  msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2170 
2171  self._cmd = [self.exe, ]
2172 
2173  if len(input) != 1 or len(output) != 1:
2174  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2175  f'Exactly one input and one output must be specified (got inputs={len(input)}, outputs={len(output)})')
2176 
2177  self._cmd.append(self.conf.dataDictionary[list(input)[0]].value[0])
2178  self._cmd.append(self.conf.dataDictionary[list(output)[0]].value[0])
2179 
2180  # Finalize execution setup by calling the parent class method
2181  super(NtupPhysValPostProcessingExecutor, self).preExecute(input=input, output=output)
2182 
2183 
2185 
2186  def preExecute(self, input = set(), output = set()):
2187  self.setPreExeStart()
2188  self._inputBS = list(input)[0]
2189  self._outputBS = list(output)[0]
2190  self._maskedFiles = []
2191  self._useStubFile = False
2192  if 'maskEmptyInputs' in self.conf.argdict and self.conf.argdict['maskEmptyInputs'].value is True:
2193  eventfullFiles = []
2194  for fname in self.conf.dataDictionary[self._inputBS].value:
2195  nEvents = self.conf.dataDictionary[self._inputBS].getSingleMetadata(fname, 'nentries')
2196  msg.debug('Found {0} events in file {1}'.format(nEvents, fname))
2197  if isinstance(nEvents, int) and nEvents > 0:
2198  eventfullFiles.append(fname)
2199  self._maskedFiles = list(set(self.conf.dataDictionary[self._inputBS].value) - set(eventfullFiles))
2200  if len(self._maskedFiles) > 0:
2201  msg.info('The following input files are masked because they have 0 events: {0}'.format(' '.join(self._maskedFiles)))
2202  if len(eventfullFiles) == 0:
2203  if 'emptyStubFile' in self.conf.argdict and path.exists(self.conf.argdict['emptyStubFile'].value):
2204  self._useStubFile = True
2205  msg.info("All input files are empty - will use stub file {0} as output".format(self.conf.argdict['emptyStubFile'].value))
2206  else:
2207  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2208  'All input files had zero events - aborting BS merge')
2209 
2210  # Write the list of input files to a text file, so that testMergedFiles can swallow it
2211  self._mergeBSFileList = '{0}.list'.format(self._name)
2212  self._mergeBSLogfile = '{0}.out'.format(self._name)
2213  try:
2214  with open(self._mergeBSFileList, 'w') as BSFileList:
2215  for fname in self.conf.dataDictionary[self._inputBS].value:
2216  if fname not in self._maskedFiles:
2217  print(fname, file=BSFileList)
2218  except OSError as e:
2219  errMsg = 'Got an error when writing list of BS files to {0}: {1}'.format(self._mergeBSFileList, e)
2220  msg.error(errMsg)
2221  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'), errMsg)
2222 
2223  # Hope that we were given a correct filename...
2224  self._outputFilename = self.conf.dataDictionary[self._outputBS].value[0]
2225  if self._outputFilename.endswith('._0001.data'):
2226  self._doRename = False
2227  self._outputFilename = self._outputFilename.split('._0001.data')[0]
2228  elif self.conf.argdict['allowRename'].value is True:
2229  # OK, non-fatal, we go for a renaming
2230  msg.info('Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2231  self._doRename = True
2232  else:
2233  # No rename allowed, so we are dead...
2234  errmsg = 'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2235  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'), errmsg)
2236 
2237  # Set the correct command for execution
2238  self._cmd = [self._exe, self._mergeBSFileList, '0', self._outputFilename]
2239 
2240  super(bsMergeExecutor, self).preExecute(input=input, output=output)
2241 
2242  def execute(self):
2243  if self._useStubFile:
2244  # Need to fake execution!
2245  self._exeStart = os.times()
2246  msg.debug('exeStart time is {0}'.format(self._exeStart))
2247  msg.info("Using stub file for empty BS output - execution is fake")
2248  if self._outputFilename != self.conf.argdict['emptyStubFile'].value:
2249  os.rename(self.conf.argdict['emptyStubFile'].value, self._outputFilename)
2250  self._memMonitor = False
2251  self._hasExecuted = True
2252  self._rc = 0
2253  self._exeStop = os.times()
2254  msg.debug('exeStop time is {0}'.format(self._exeStop))
2255  else:
2256  super(bsMergeExecutor, self).execute()
2257 
2258  def postExecute(self):
2259  if self._useStubFile:
2260  pass
2261  elif self._doRename:
2262  self._expectedOutput = self._outputFilename + '._0001.data'
2263  msg.info('Renaming {0} to {1}'.format(self._expectedOutput, self.conf.dataDictionary[self._outputBS].value[0]))
2264  try:
2265  os.rename(self._outputFilename + '._0001.data', self.conf.dataDictionary[self._outputBS].value[0])
2266  except OSError as e:
2267  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
2268  'Exception raised when renaming {0} to {1}: {2}'.format(self._outputFilename, self.conf.dataDictionary[self._outputBS].value[0], e))
2269  super(bsMergeExecutor, self).postExecute()
2270 
2271 
2272 
2274 
2275  def preExecute(self, input = set(), output = set()):
2276  self.setPreExeStart()
2277  self._memMonitor = False
2278 
2279  #archiving
2280  if self._exe == 'zip':
2281  if 'outputArchFile' not in self.conf.argdict:
2282  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
2283 
2284  self._cmd = ['python']
2285  try:
2286  with open('zip_wrapper.py', 'w') as zip_wrapper:
2287  print("import zipfile, os, shutil", file=zip_wrapper)
2288  if os.path.exists(self.conf.argdict['outputArchFile'].value[0]):
2289  #appending input file(s) to existing archive
2290  print("zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2291  else:
2292  #creating new archive
2293  print("zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2294  print("for f in {}:".format(self.conf.argdict['inputDataFile'].value), file=zip_wrapper)
2295  #This module gives false positives (as of python 3.7.0). Will also check the name for ".zip"
2296  #print >> zip_wrapper, " if zipfile.is_zipfile(f):"
2297  print(" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2298  print(" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2299  print(" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2300  print(" archive.extractall('tmp')", file=zip_wrapper)
2301  print(" archive.close()", file=zip_wrapper)
2302  # remove stuff as soon as it is saved to output in order to save disk space at worker node
2303  print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2304  print(" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2305  print(" os.unlink(f)", file=zip_wrapper)
2306  print(" if os.path.isdir('tmp'):", file=zip_wrapper)
2307  print(" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2308  print(" for name in files:", file=zip_wrapper)
2309  print(" print 'Zipping {}'.format(name)", file=zip_wrapper)
2310  print(" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2311  print(" shutil.rmtree('tmp')", file=zip_wrapper)
2312  print(" else:", file=zip_wrapper)
2313  print(" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2314  print(" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2315  print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2316  print(" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2317  print(" os.unlink(f)", file=zip_wrapper)
2318  print("zf.close()", file=zip_wrapper)
2319  os.chmod('zip_wrapper.py', 0o755)
2320  except OSError as e:
2321  errMsg = 'error writing zip wrapper {fileName}: {error}'.format(fileName = 'zip_wrapper.py',
2322  error = e
2323  )
2324  msg.error(errMsg)
2325  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2326  errMsg
2327  )
2328  self._cmd.append('zip_wrapper.py')
2329 
2330  #unarchiving
2331  elif self._exe == 'unarchive':
2332  import zipfile
2333  for infile in self.conf.argdict['inputArchFile'].value:
2334  if not zipfile.is_zipfile(infile):
2335  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2336  'An input file is not a zip archive - aborting unpacking')
2337  self._cmd = ['python']
2338  try:
2339  with open('unarchive_wrapper.py', 'w') as unarchive_wrapper:
2340  print("import zipfile", file=unarchive_wrapper)
2341  print("for f in {}:".format(self.conf.argdict['inputArchFile'].value), file=unarchive_wrapper)
2342  print(" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2343  print(" path = '{}'".format(self.conf.argdict['path']), file=unarchive_wrapper)
2344  print(" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2345  print(" archive.extractall(path)", file=unarchive_wrapper)
2346  print(" archive.close()", file=unarchive_wrapper)
2347  os.chmod('unarchive_wrapper.py', 0o755)
2348  except OSError as e:
2349  errMsg = 'error writing unarchive wrapper {fileName}: {error}'.format(fileName = 'unarchive_wrapper.py',
2350  error = e
2351  )
2352  msg.error(errMsg)
2353  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2354  errMsg
2355  )
2356  self._cmd.append('unarchive_wrapper.py')
2357  super(archiveExecutor, self).preExecute(input=input, output=output)
2358 
python.trfExe.logscanExecutor._logFileName
_logFileName
Definition: trfExe.py:510
python.trfExe.transformExecutor._isValidated
_isValidated
Definition: trfExe.py:176
python.trfExe.echoExecutor.execute
def execute(self)
Definition: trfExe.py:580
python.trfExe.athenaExecutor._errorMaskFiles
_errorMaskFiles
Definition: trfExe.py:894
python.trfExe.transformExecutor._hasValidated
_hasValidated
Definition: trfExe.py:175
python.trfExe.athenaExecutor
Definition: trfExe.py:844
python.trfExe.athenaExecutor._runtimeRunargs
_runtimeRunargs
Definition: trfExe.py:891
python.trfExe.executorConfig.addToArgdict
def addToArgdict(self, key, value)
Add a new object to the argdict.
Definition: trfExe.py:118
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
python.trfExe.DQMPostProcessExecutor
Specialist execution class for DQM post-processing of histograms.
Definition: trfExe.py:2040
python.trfExe.athenaExecutor.disableMT
def disableMT(self)
Definition: trfExe.py:950
python.trfExe.executorConfig._argdict
_argdict
Definition: trfExe.py:64
python.trfExe.transformExecutor._athenaConcurrentEvents
_athenaConcurrentEvents
Definition: trfExe.py:195
python.trfExe.scriptExecutor._exeLogFile
_exeLogFile
Definition: trfExe.py:697
python.trfExe.transformExecutor.errMsg
def errMsg(self)
Definition: trfExe.py:304
python.trfExe.athenaExecutor._setupFile
_setupFile
Definition: trfExe.py:1574
python.trfExe.athenaExecutor._prepAthenaCommandLine
def _prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
Definition: trfExe.py:1454
python.trfExe.athenaExecutor._extraRunargs
_extraRunargs
Definition: trfExe.py:890
python.trfExe.transformExecutor.execute
def execute(self)
Definition: trfExe.py:475
python.trfExe.DQMPostProcessExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2050
python.trfMPTools.athenaMPOutputHandler
def athenaMPOutputHandler(athenaMPFileReport, athenaMPWorkerTopDir, dataDictionary, athenaMPworkers, skipFileChecks=False, argdict={})
Handle AthenaMP outputs, updating argFile instances to real.
Definition: trfMPTools.py:70
python.trfExe.DQMergeExecutor.validate
def validate(self)
Definition: trfExe.py:1997
python.trfExe.echoExecutor.__init__
def __init__(self, name='Echo', trf=None)
Definition: trfExe.py:574
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
python.trfExe.transformExecutor.myMerger
def myMerger(self)
Now define properties for these data members.
Definition: trfExe.py:207
python.trfExe.transformExecutor
Executors always only even execute a single step, as seen by the transform.
Definition: trfExe.py:127
python.trfExe.athenaExecutor._inputDataTypeCountCheck
_inputDataTypeCountCheck
Definition: trfExe.py:895
python.trfExe.DQMergeExecutor.__init__
def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
Definition: trfExe.py:1950
python.trfExe.transformExecutor._memFullFile
_memFullFile
Definition: trfExe.py:191
python.trfExe.NTUPMergeExecutor
Specialist execution class for merging NTUPLE files.
Definition: trfExe.py:2139
python.trfExe.logscanExecutor.validate
def validate(self)
Definition: trfExe.py:518
python.trfExe.executorConfig.setFromTransform
def setFromTransform(self, trf)
Set configuration properties from the parent transform.
Definition: trfExe.py:113
python.trfExe.POOLMergeExecutor.__init__
def __init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
Definition: trfExe.py:1864
python.trfArgClasses.argPOOLFile
POOL file class.
Definition: trfArgClasses.py:1440
python.trfExe.transformExecutor.preExeStartTimes
def preExeStartTimes(self)
Definition: trfExe.py:332
python.trfExceptions.TransformSetupException
Setup exceptions.
Definition: trfExceptions.py:42
python.trfExe.POOLMergeExecutor.execute
def execute(self)
Definition: trfExe.py:1880
python.trfExe.athenaExecutor._skeleton
_skeleton
Definition: trfExe.py:909
python.trfExe.athenaExecutor._skeletonCA
_skeletonCA
Definition: trfExe.py:901
python.trfExe.transformExecutor.cpuTimeTotal
def cpuTimeTotal(self)
Definition: trfExe.py:430
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfExe.transformExecutor.validate
def validate(self)
Definition: trfExe.py:489
python.trfExe.executorConfig.dataDictionary
def dataDictionary(self)
Definition: trfExe.py:79
python.trfExe.transformExecutor._validation
_validation
Definition: trfExe.py:313
python.trfExe.transformExecutor.inData
inData
Definition: trfExe.py:251
python.trfExe.athenaExecutor._athenaMPReadEventOrders
_athenaMPReadEventOrders
Definition: trfExe.py:1112
python.trfExe.DQMergeExecutor
Specialist execution class for merging DQ histograms.
Definition: trfExe.py:1949
python.trfExe.dummyExecutor.execute
def execute(self)
Definition: trfExe.py:602
python.trfExe.logscanExecutor.__init__
def __init__(self, name='Logscan')
Definition: trfExe.py:507
python.trfExe.DQMergeExecutor._histMergeList
_histMergeList
Definition: trfExe.py:1952
python.trfExe.logscanExecutor
Special executor that will enable a logfile scan as part of its validation.
Definition: trfExe.py:506
python.trfExe.transformExecutor._myMerger
_myMerger
Definition: trfExe.py:202
python.trfExe.transformExecutor._rc
_rc
Definition: trfExe.py:171
PyJobTransforms.trfJobOptions
Contains functions related Athena Job Options files.
python.trfExe.transformExecutor._valStop
_valStop
Definition: trfExe.py:188
index
Definition: index.py:1
python.trfExe.DQMPostProcessExecutor.validate
def validate(self)
Definition: trfExe.py:2097
python.trfExe.scriptExecutor.validate
def validate(self)
Definition: trfExe.py:801
python.trfExe.transformExecutor.athenaMP
def athenaMP(self)
Definition: trfExe.py:452
python.trfExe.NtupPhysValPostProcessingExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2167
python.trfExe.bsMergeExecutor
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
Definition: trfExe.py:2184
python.trfExe.athenaExecutor._onlyMPWithRunargs
_onlyMPWithRunargs
Definition: trfExe.py:900
min
constexpr double min()
Definition: ap_fixedTest.cxx:26
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1684
python.trfExe.athenaExecutor._asetup
_asetup
Definition: trfExe.py:1568
python.trfExe.athenaExecutor._topOptionsFiles
_topOptionsFiles
Handle MPI setup.
Definition: trfExe.py:1188
python.trfExe.transformExecutor._hasExecuted
_hasExecuted
Definition: trfExe.py:170
python.trfExe.transformExecutor._eventCount
_eventCount
Definition: trfExe.py:192
python.trfExe.executorConfig.firstExecutor
def firstExecutor(self)
Definition: trfExe.py:87
python.trfExe.scriptExecutor._echoOutput
_echoOutput
Definition: trfExe.py:632
python.trfExe.transformExecutor.output
def output(self)
Definition: trfExe.py:285
python.trfUtils.ValgrindCommand
def ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition: trfUtils.py:1577
python.trfExe.logscanExecutor._logScan
_logScan
TODO: This is a cut'n'paste from the athenaExecutor We really should factorise this and use it common...
Definition: trfExe.py:537
python.trfExe._encoding_stream
def _encoding_stream(s)
Definition: trfExe.py:45
python.trfExe.transformExecutor.validationWallTime
def validationWallTime(self)
Definition: trfExe.py:423
python.trfExe.POOLMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:1875
python.trfExe.scriptExecutor._echologger
_echologger
Definition: trfExe.py:692
python.trfExe.scriptExecutor._echostream
_echostream
Definition: trfExe.py:702
python.trfMTTools.detectAthenaMTThreads
def detectAthenaMTThreads(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMT has been requested.
Definition: trfMTTools.py:23
python.trfExe.reductionFrameworkExecutor
Specialist executor to manage the handling of multiple implicit input and output files within the der...
Definition: trfExe.py:1887
python.trfExe.bsMergeExecutor._mergeBSLogfile
_mergeBSLogfile
Definition: trfExe.py:2212
python.trfExe.transformExecutor.postExeWallTime
def postExeWallTime(self)
Definition: trfExe.py:409
athena.value
value
Definition: athena.py:124
python.trfExe.transformExecutor.memStats
def memStats(self)
Definition: trfExe.py:394
python.trfExe.reductionFrameworkExecutor.preExecute
def preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
Definition: trfExe.py:1890
python.trfExe.transformExecutor.preExeCpuTime
def preExeCpuTime(self)
Definition: trfExe.py:352
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.trfExe.transformExecutor.__init__
def __init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition: trfExe.py:138
python.trfExe.transformExecutor._outData
_outData
Definition: trfExe.py:145
PyJobTransforms.trfExitCodes
Module for transform exit codes.
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.trfExe.scriptExecutor._exe
_exe
Definition: trfExe.py:622
python.trfEnv.environmentUpdate
Class holding the update to an environment that will be passed on to an executor.
Definition: trfEnv.py:15
python.trfExe.transformExecutor.extraMetadata
def extraMetadata(self)
Definition: trfExe.py:292
intersection
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
Definition: compareFlatTrees.cxx:25
python.trfExe.transformExecutor.setValStart
def setValStart(self)
Definition: trfExe.py:466
python.trfUtils.reportEventsPassedSimFilter
def reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition: trfUtils.py:1717
python.trfExe.transformExecutor._extraMetadata
_extraMetadata
Definition: trfExe.py:181
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.trfExe.bsMergeExecutor._expectedOutput
_expectedOutput
Definition: trfExe.py:2262
python.trfExe.athenaExecutor.validate
def validate(self)
Definition: trfExe.py:1321
python.trfExe.athenaExecutor._originalCmd
_originalCmd
Definition: trfExe.py:1567
python.trfExe.executorConfig.addToDataDictionary
def addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
Definition: trfExe.py:122
python.trfExe.DQMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:1959
python.trfExe.NtupPhysValPostProcessingExecutor
Specialist execution class for running post processing on merged PHYVAL NTUPLE file.
Definition: trfExe.py:2164
python.trfExe.executorConfig.totalExecutorSteps
def totalExecutorSteps(self)
Definition: trfExe.py:103
python.trfExe.archiveExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2275
python.trfExe.transformExecutor.valStartTimes
def valStartTimes(self)
Definition: trfExe.py:344
python.trfExe.transformExecutor.hasExecuted
def hasExecuted(self)
Definition: trfExe.py:296
python.trfExe.athenaExecutor._envUpdate
_envUpdate
Add input/output file information - this can't be done in init as we don't know what our inputs and o...
Definition: trfExe.py:1226
python.trfExe.bsMergeExecutor._doRename
_doRename
Definition: trfExe.py:2226
python.trfExe.transformExecutor._exeStop
_exeStop
Definition: trfExe.py:187
python.trfExe.dummyExecutor
Definition: trfExe.py:595
python.trfExe.athenaExecutor._wrapperFile
_wrapperFile
Definition: trfExe.py:1573
python.trfExceptions.TransformExecutionException
Base class for execution exceptions.
Definition: trfExceptions.py:62
python.trfExe.transformExecutor.wallTime
def wallTime(self)
Definition: trfExe.py:387
PixelModuleFeMask_create_db.remove
string remove
Definition: PixelModuleFeMask_create_db.py:83
python.trfExe.scriptExecutor.execute
def execute(self)
Definition: trfExe.py:723
python.trfExe.transformExecutor.wallTimeTotal
def wallTimeTotal(self)
Definition: trfExe.py:437
python.trfExe.transformExecutor._valStart
_valStart
Definition: trfExe.py:188
python.trfExe.scriptExecutor.exeArgs
def exeArgs(self)
Definition: trfExe.py:650
python.trfExe.executorConfig._firstExecutor
_firstExecutor
Definition: trfExe.py:66
python.trfExe.transformExecutor.inDataUpdate
def inDataUpdate(self, value)
Definition: trfExe.py:245
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
python.trfExe.executorConfig.__init__
def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Definition: trfExe.py:63
python.trfExe.executorConfig._executorStep
_executorStep
Definition: trfExe.py:67
python.trfExe.athenaExecutor.name
name
Definition: trfExe.py:1295
python.trfExe.DQMPostProcessExecutor.__init__
def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
Definition: trfExe.py:2041
python.trfExe.transformExecutor._containerSetup
_containerSetup
Definition: trfExe.py:199
python.trfExe.transformExecutor.eventCount
def eventCount(self)
Definition: trfExe.py:444
python.trfExe.athenaExecutor._workdir
_workdir
Definition: trfExe.py:1571
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
python.trfExe.scriptExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:658
PyJobTransforms.trfMPTools
Utilities for handling AthenaMP jobs.
python.trfExe.logscanExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:512
python.trfExe.athenaExecutor._smartMerge
def _smartMerge(self, fileArg)
Manage smart merging of output files.
Definition: trfExe.py:1748
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
python.trfValidation.ignorePatterns
Class of patterns that can be ignored from athena logfiles.
Definition: trfValidation.py:100
python.trfExe.logscanExecutor._errorMaskFiles
_errorMaskFiles
Definition: trfExe.py:509
python.trfExe.transformExecutor.outDataUpdate
def outDataUpdate(self, value)
Definition: trfExe.py:265
python.trfExe.transformExecutor.conf
conf
Executor configuration:
Definition: trfExe.py:163
python.trfExe.athenaExecutor._tryDropAndReload
_tryDropAndReload
Definition: trfExe.py:889
python.trfExe.athenaExecutor._writeAthenaWrapper
def _writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
Definition: trfExe.py:1566
python.trfExe.transformExecutor.input
def input(self)
Definition: trfExe.py:276
plotBeamSpotVxVal.range
range
Definition: plotBeamSpotVxVal.py:194
python.trfExe.scriptExecutor
Definition: trfExe.py:619
python.trfExe.athenaExecutor._literalRunargs
_literalRunargs
Definition: trfExe.py:892
python.trfExe.executorConfig._dataDictionary
_dataDictionary
Definition: trfExe.py:65
add
bool add(const std::string &hname, TKey *tobj)
Definition: fastadd.cxx:55
python.trfArgClasses.argSubstepList
Argument class for substep lists, suitable for preExec/postExec.
Definition: trfArgClasses.py:2056
python.trfExe.athenaExecutor._athenaMPStrategy
_athenaMPStrategy
Definition: trfExe.py:1118
python.trfValidation.athenaLogFileReport
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Definition: trfValidation.py:211
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.trfExe.transformExecutor.postExeCpuTime
def postExeCpuTime(self)
Definition: trfExe.py:402
python.trfExe.transformExecutor._athenaMP
_athenaMP
Definition: trfExe.py:193
python.trfExe.executorConfig.executorStep
def executorStep(self)
Definition: trfExe.py:95
python.trfExe.echoExecutor
Definition: trfExe.py:573
python.trfExe.transformExecutor._exeStart
_exeStart
Definition: trfExe.py:187
python.trfExe.athenaExecutor._logScan
_logScan
Definition: trfExe.py:1372
python.trfExe.transformExecutor.exeStopTimes
def exeStopTimes(self)
Definition: trfExe.py:340
python.trfExe.athenaExecutor._disableMP
_disableMP
Definition: trfExe.py:897
python.trfExe.scriptExecutor._memSummaryFile
_memSummaryFile
Definition: trfExe.py:749
python.trfExe.scriptExecutor.__init__
def __init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Definition: trfExe.py:620
python.trfExe.athenaExecutor.disableMP
def disableMP(self)
Definition: trfExe.py:942
python.trfUtils.VTuneCommand
def VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaCommand=["athena.py", "athenaConf.pkl"], returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition: trfUtils.py:1643
python.trfExe.scriptExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:788
PyJobTransforms.trfValidation
Validation control for job transforms.
beamspotman.dir
string dir
Definition: beamspotman.py:619
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.trfExceptions.TransformValidationException
Group of validation exceptions.
Definition: trfExceptions.py:50
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
python.trfExe.transformExecutor.dbMonitor
def dbMonitor(self)
Definition: trfExe.py:456
python.trfExe.transformExecutor.memAnalysis
def memAnalysis(self)
Definition: trfExe.py:398
python.trfExe.scriptExecutor._buildStandardCommand
def _buildStandardCommand(self)
Definition: trfExe.py:706
python.trfExe.executorConfig.argdict
def argdict(self)
Definition: trfExe.py:71
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.trfExe.optionalAthenaExecutor
Athena executor where failure is not consisered fatal.
Definition: trfExe.py:1835
python.trfExe.athenaExecutor._isCAEnabled
def _isCAEnabled(self)
Check if running with CA.
Definition: trfExe.py:1434
python.trfExe.bsMergeExecutor._outputFilename
_outputFilename
Definition: trfExe.py:2224
python.trfExe.executorConfig._totalExecutorSteps
_totalExecutorSteps
Definition: trfExe.py:68
python.trfExe.transformExecutor._athenaMT
_athenaMT
Definition: trfExe.py:194
python.trfExe.athenaExecutor._substep
_substep
Definition: trfExe.py:887
python.trfExe.dummyExecutor.__init__
def __init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition: trfExe.py:596
python.trfExe.transformExecutor.valStopTimes
def valStopTimes(self)
Definition: trfExe.py:348
PyJobTransforms.trfMTTools
Utilities for handling AthenaMT jobs.
python.trfExe.transformExecutor._preExeStart
_preExeStart
Definition: trfExe.py:186
python.trfExe.transformExecutor.trf
def trf(self)
Definition: trfExe.py:225
python.trfExe.archiveExecutor
Archive transform.
Definition: trfExe.py:2273
python.trfExe.athenaExecutor._jobOptionsTemplate
_jobOptionsTemplate
Definition: trfExe.py:921
python.trfExe.athenaExecutor._onlyMP
_onlyMP
Do we need to run asetup first? For a certain substep (e.g.
Definition: trfExe.py:898
python.trfExe.athenaExecutor._athenaMPEventOrdersFile
_athenaMPEventOrdersFile
Definition: trfExe.py:1110
python.trfExe.transformExecutor.sysTime
def sysTime(self)
Definition: trfExe.py:380
python.trfExeStepTools.commonExecutorStepName
def commonExecutorStepName(name)
Definition: trfExeStepTools.py:7
python.trfExe.scriptExecutor._input
_input
Definition: trfExe.py:662
python.trfExe.transformExecutor.cpuTime
def cpuTime(self)
Definition: trfExe.py:366
python.trfMPTools.detectAthenaMPProcs
def detectAthenaMPProcs(argdict={}, currentSubstep='', legacyThreadingRelease=False)
Detect if AthenaMP has been requested.
Definition: trfMPTools.py:27
python.trfExe.athenaExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:1241
python.trfExe.athenaExecutor.onlyMT
def onlyMT(self)
Definition: trfExe.py:966
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1692
python.trfExe.transformExecutor._alreadyInContainer
_alreadyInContainer
Definition: trfExe.py:198
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
Trk::open
@ open
Definition: BinningType.h:40
python.trfExe.DQMPostProcessExecutor._histMergeList
_histMergeList
Definition: trfExe.py:2043
python.trfExe.transformExecutor.isValidated
def isValidated(self)
Definition: trfExe.py:320
python.trfExe.transformExecutor._errMsg
_errMsg
Definition: trfExe.py:172
python.trfExe.transformExecutor._inData
_inData
Definition: trfExe.py:144
python.trfExe.transformExecutor.setPreExeStart
def setPreExeStart(self)
Definition: trfExe.py:461
PyJobTransforms.trfMPITools
Utilities for handling MPI-Athena jobs.
ActsTrk::detail::MakeDerivedVariant::extend
constexpr std::variant< Args..., T > extend(const std::variant< Args... > &, const T &)
Definition: MakeDerivedVariant.h:17
python.trfExe.executorConfig
Definition: trfExe.py:57
python.trfExe.athenaExecutor.__init__
def __init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
Definition: trfExe.py:885
PyJobTransforms.trfUtils
Transform utility functions.
python.trfExe.athenaExecutor._targzipJiveXML
def _targzipJiveXML(self)
Definition: trfExe.py:1812
python.trfExe.athenaExecutor.onlyMP
def onlyMP(self)
Definition: trfExe.py:958
python.trfExe.transformExecutor.preExeWallTime
def preExeWallTime(self)
Definition: trfExe.py:359
python.trfExe.bsMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2186
python.trfExe.POOLMergeExecutor
Definition: trfExe.py:1852
PyJobTransforms.trfLogger
Logging configuration for ATLAS job transforms.
python.trfExe.transformExecutor.first
def first(self)
Definition: trfExe.py:325
python.trfExe.transformExecutor.rc
def rc(self)
Definition: trfExe.py:300
python.trfExe.transformExecutor.usrTime
def usrTime(self)
Definition: trfExe.py:373
python.trfExe.transformExecutor.outData
outData
Definition: trfExe.py:271
python.trfExe.bsMergeExecutor._inputBS
_inputBS
Definition: trfExe.py:2188
python.trfExe.transformExecutor._dbMonitor
_dbMonitor
Definition: trfExe.py:196
python.trfExceptions.TransformLogfileErrorException
Exception class for validation failures detected by parsing logfiles.
Definition: trfExceptions.py:58
python.trfExe.optionalAthenaExecutor.validate
def validate(self)
Definition: trfExe.py:1838
python.trfExe.scriptExecutor._memMonitor
_memMonitor
Definition: trfExe.py:637
python.trfExe.athenaExecutor._athenaMPWorkerTopDir
_athenaMPWorkerTopDir
Definition: trfExe.py:1108
python.trfExe.athenaExecutor.inputDataTypeCountCheck
def inputDataTypeCountCheck(self)
Definition: trfExe.py:930
python.trfExe.bsMergeExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:2258
pickleTool.object
object
Definition: pickleTool.py:29
python.trfExe.bsMergeExecutor._useStubFile
_useStubFile
Definition: trfExe.py:2191
python.trfExe.scriptExecutor._cmd
_cmd
Definition: trfExe.py:635
python.trfExe.athenaExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:973
str
Definition: BTagTrackIpAccessor.cxx:11
python.trfExe.athenaExecutor._inputEventTest
_inputEventTest
Definition: trfExe.py:888
python.trfExe.transformExecutor.name
def name(self)
Definition: trfExe.py:211
python.trfExe.NTUPMergeExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:2141
python.trfExe.transformExecutor.validation
def validation(self)
Definition: trfExe.py:308
python.trfExe.transformExecutor.postExecute
def postExecute(self)
Definition: trfExe.py:486
python.trfExe.transformExecutor.substep
def substep(self)
Definition: trfExe.py:219
python.trfExe.athenaExecutor._onlyMT
_onlyMT
Definition: trfExe.py:899
python.trfExe.scriptExecutor._exeArgs
_exeArgs
Definition: trfExe.py:625
python.trfExe.scriptExecutor._logFileName
_logFileName
Definition: trfExe.py:671
python.trfExe.athenaExecutor._dbsetup
_dbsetup
Definition: trfExe.py:1569
python.trfExe.scriptExecutor.exe
def exe(self)
Definition: trfExe.py:641
python.trfExe.transformExecutor.hasValidated
def hasValidated(self)
Definition: trfExe.py:316
python.trfExe.transformExecutor.doAll
def doAll(self, input=set(), output=set())
Convenience function.
Definition: trfExe.py:499
python.trfExe.scriptExecutor._output
_output
Definition: trfExe.py:663
python.trfExe.transformExecutor._memStats
_memStats
Definition: trfExe.py:189
python.trfExe.transformExecutor._name
_name
Definition: trfExe.py:141
python.trfExe.bsMergeExecutor._maskedFiles
_maskedFiles
Definition: trfExe.py:2190
python.trfUtils.forceToAlphaNum
def forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
Definition: trfUtils.py:443
python.trfExe.athenaExecutor._disableMT
_disableMT
Definition: trfExe.py:896
python.ParticleTypeUtil.info
def info
Definition: ParticleTypeUtil.py:87
python.trfExe.transformExecutor._memLeakResult
_memLeakResult
Definition: trfExe.py:190
python.trfExe.athenaExecutor._perfMonFile
_perfMonFile
Definition: trfExe.py:904
python.trfExe.athenaExecutor._dataArgs
_dataArgs
Definition: trfExe.py:893
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfValidation.scriptLogFileReport
Definition: trfValidation.py:709
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfExe.bsMergeExecutor._outputBS
_outputBS
Definition: trfExe.py:2189
python.trfExe.transformExecutor.validationCpuTime
def validationCpuTime(self)
Definition: trfExe.py:416
python.trfExe.transformExecutor._resimevents
_resimevents
Definition: trfExe.py:197
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223
python.trfExe.athenaExecutor.substep
def substep(self)
Definition: trfExe.py:938
python.trfExe.transformExecutor._trf
_trf
Definition: trfExe.py:232
python.trfExe.transformExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trfExe.py:471
python.trfExe.athenaExecutor._athenaMPFileReport
_athenaMPFileReport
Definition: trfExe.py:1109
python.trfExe.transformExecutor.exeStartTimes
def exeStartTimes(self)
Definition: trfExe.py:336
python.trfExe.transformExecutor.reSimEvent
def reSimEvent(self)
Definition: trfExe.py:448
python.trfExe.bsMergeExecutor._mergeBSFileList
_mergeBSFileList
Definition: trfExe.py:2211
python.trfValidation.eventMatch
Small class used for vailiadating event counts between input and output files.
Definition: trfValidation.py:943
python.trfExe.bsMergeExecutor.execute
def execute(self)
Definition: trfExe.py:2242