ATLAS Offline Software
Loading...
Searching...
No Matches
trfExe.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
8
9import json
10import math
11import os
12import os.path as path
13import re
14import signal
15import subprocess
16import sys
17import time
18
19import logging
20from fnmatch import fnmatch
21msg = logging.getLogger(__name__)
22
23from PyJobTransforms.trfJobOptions import JobOptionsTemplate
24from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
28from PyJobTransforms.trfExitCodes import trfExit
29from PyJobTransforms.trfLogger import stdLogLevels
30from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
31from PyJobTransforms.trfMTTools import detectAthenaMTThreads
32
33import PyJobTransforms.trfExceptions as trfExceptions
34import PyJobTransforms.trfValidation as trfValidation
35import PyJobTransforms.trfArgClasses as trfArgClasses
36import PyJobTransforms.trfEnv as trfEnv
38
39
40# Depending on the setting of LANG, sys.stdout may end up with ascii or ansi
41# encoding, rather than utf-8. But Athena uses unicode for some log messages
42# (another example of Gell-Mann's totalitarian principle) and that will result
43# in a crash with python 3. In such a case, force the use of a utf-8 encoded
44# output stream instead.
46 enc = s.encoding.lower()
47 if enc.find('ascii') >= 0 or enc.find('ansi') >= 0:
48 return open (s.fileno(), 'w', encoding='utf-8')
49 return s
50
51
52
58
59
63 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
64 self._argdict = argdict
65 self._dataDictionary = dataDictionary
66 self._firstExecutor = firstExecutor
67 self._executorStep = -1
69
70 @property
71 def argdict(self):
72 return self._argdict
73
74 @argdict.setter
75 def argdict(self, value):
76 self._argdict = value
77
78 @property
79 def dataDictionary(self):
80 return self._dataDictionary
81
82 @dataDictionary.setter
83 def dataDictionary(self, value):
84 self._dataDictionary = value
85
86 @property
87 def firstExecutor(self):
88 return self._firstExecutor
89
90 @firstExecutor.setter
91 def firstExecutor(self, value):
92 self._firstExecutor = value
93
94 @property
95 def executorStep(self):
96 return self._executorStep
97
98 @executorStep.setter
99 def executorStep(self, value):
100 self._executorStep = value
101
102 @property
104 return self._totalExecutorSteps
105
106 @totalExecutorSteps.setter
107 def totalExecutorSteps(self, value):
108 self._totalExecutorSteps = value
109
110
113 def setFromTransform(self, trf):
114 self._argdict = trf.argdict
115 self._dataDictionary = trf.dataDictionary
116
117
118 def addToArgdict(self, key, value):
119 self._argdict[key] = value
120
121
122 def addToDataDictionary(self, key, value):
123 self._dataDictionary[key] = value
124
125
126
128
129
138 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
139 # Some information to produce helpful log messages
140
141 self._name = forceToAlphaNum(name)
142 # Data this executor can start from and produce
143 # Note we transform NULL to inNULL and outNULL as a convenience
144 self._inData = set(inData)
145 self._outData = set(outData)
146 if 'NULL' in self._inData:
147 self._inData.remove('NULL')
148 self._inData.add('inNULL')
149 if 'NULL' in self._outData:
150 self._outData.remove('NULL')
151 self._outData.add('outNULL')
152
153 # It's forbidden for an executor to consume and produce the same datatype
154 dataOverlap = self._inData & self._outData
155 if len(dataOverlap) > 0:
156 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
157 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self._name, ' '.join(dataOverlap)))
158
159
162 if conf is not None:
163 self.conf = conf
164 else:
165 self.conf = executorConfig()
166 if trf is not None:
167 self.conf.setFromTransform(trf)
168
169 # Execution status
170 self._hasExecuted = False
171 self._rc = -1
172 self._errMsg = None
173
174 # Validation status
175 self._hasValidated = False
176 self._isValidated = False
177
178 # Extra metadata
179 # This dictionary holds extra metadata for this executor which will be
180 # provided in job reports
182
183
186 self._preExeStart = None
187 self._exeStart = self._exeStop = None
188 self._valStart = self._valStop = None
189 self._memStats = {}
191 self._memFullFile = None
192 self._eventCount = None
193 self._athenaMP = 0
194 self._athenaMT = 0
196 self._dbMonitor = None
197 self._resimevents = None
199 self._containerSetup = None
200
201 # Holder for execution information about any merges done by this executor in MP mode
202 self._myMerger = []
203
204
205
206 @property
207 def myMerger(self):
208 return self._myMerger
209
210 @property
211 def name(self):
212 return self._name
213
214 @name.setter
215 def name(self, value):
216 self._name = value
217
218 @property
219 def substep(self):
220 if '_substep' in dir(self):
221 return self._substep
222 return None
223
224 @property
225 def trf(self):
226 if '_trf' in dir(self):
227 return self._trf
228 return None
229
230 @trf.setter
231 def trf(self, value):
232 self._trf = value
233
234 @property
235 def inData(self):
236
237 if '_inData' in dir(self):
238 return self._inData
239 return None
240
241 @inData.setter
242 def inData(self, value):
243 self._inData = set(value)
244
245 def inDataUpdate(self, value):
246
247 if '_inData' in dir(self):
248 self._inData.update(value)
249 else:
250
251 self.inData = value
252
253
254 @property
255 def outData(self):
256
257 if '_outData' in dir(self):
258 return self._outData
259 return None
260
261 @outData.setter
262 def outData(self, value):
263 self._outData = set(value)
264
265 def outDataUpdate(self, value):
266
267 if '_outData' in dir(self):
268 self._outData.update(value)
269 else:
270
271 self.outData = value
272
273 @property
274
276 def input(self):
277
278 if '_input' in dir(self):
279 return self._input
280 return None
281
282 @property
283
285 def output(self):
286
287 if '_output' in dir(self):
288 return self._output
289 return None
290
291 @property
292 def extraMetadata(self):
293 return self._extraMetadata
294
295 @property
296 def hasExecuted(self):
297 return self._hasExecuted
298
299 @property
300 def rc(self):
301 return self._rc
302
303 @property
304 def errMsg(self):
305 return self._errMsg
306
307 @property
308 def validation(self):
309 return self._validation
310
311 @validation.setter
312 def validation(self, value):
313 self._validation = value
314
315 @property
316 def hasValidated(self):
317 return self._hasValidated
318
319 @property
320 def isValidated(self):
321 return self._isValidated
322
323
324 @property
325 def first(self):
326 if hasattr(self, '_first'):
327 return self._first
328 else:
329 return None
330
331 @property
333 return self._preExeStart
334
335 @property
336 def exeStartTimes(self):
337 return self._exeStart
338
339 @property
340 def exeStopTimes(self):
341 return self._exeStop
342
343 @property
344 def valStartTimes(self):
345 return self._valStart
346
347 @property
348 def valStopTimes(self):
349 return self._valStop
350
351 @property
352 def preExeCpuTime(self):
353 if self._preExeStart and self._exeStart:
354 return calcCpuTime(self._preExeStart, self._exeStart)
355 else:
356 return None
357
358 @property
359 def preExeWallTime(self):
360 if self._preExeStart and self._exeStart:
361 return calcWallTime(self._preExeStart, self._exeStart)
362 else:
363 return None
364
365 @property
366 def cpuTime(self):
367 if self._exeStart and self._exeStop:
368 return calcCpuTime(self._exeStart, self._exeStop)
369 else:
370 return None
371
372 @property
373 def usrTime(self):
374 if self._exeStart and self._exeStop:
375 return self._exeStop[2] - self._exeStart[2]
376 else:
377 return None
378
379 @property
380 def sysTime(self):
381 if self._exeStart and self._exeStop:
382 return self._exeStop[3] - self._exeStart[3]
383 else:
384 return None
385
386 @property
387 def wallTime(self):
388 if self._exeStart and self._exeStop:
389 return calcWallTime(self._exeStart, self._exeStop)
390 else:
391 return None
392
393 @property
394 def memStats(self):
395 return self._memStats
396
397 @property
398 def memAnalysis(self):
399 return self._memLeakResult
400
401 @property
402 def postExeCpuTime(self):
403 if self._exeStop and self._valStart:
404 return calcCpuTime(self._exeStop, self._valStart)
405 else:
406 return None
407
408 @property
410 if self._exeStop and self._valStart:
411 return calcWallTime(self._exeStop, self._valStart)
412 else:
413 return None
414
415 @property
417 if self._valStart and self._valStop:
418 return calcCpuTime(self._valStart, self._valStop)
419 else:
420 return None
421
422 @property
424 if self._valStart and self._valStop:
425 return calcWallTime(self._valStart, self._valStop)
426 else:
427 return None
428
429 @property
430 def cpuTimeTotal(self):
431 if self._preExeStart and self._valStop:
432 return calcCpuTime(self._preExeStart, self._valStop)
433 else:
434 return None
435
436 @property
437 def wallTimeTotal(self):
438 if self._preExeStart and self._valStop:
439 return calcWallTime(self._preExeStart, self._valStop)
440 else:
441 return None
442
443 @property
444 def eventCount(self):
445 return self._eventCount
446
447 @property
448 def reSimEvent(self):
449 return self._resimevents
450
451 @property
452 def athenaMP(self):
453 return self._athenaMP
454
455 @property
456 def dbMonitor(self):
457 return self._dbMonitor
458
459
460 # set start times, if not set already
461 def setPreExeStart(self):
462 if self._preExeStart is None:
463 self._preExeStart = os.times()
464 msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465
466 def setValStart(self):
467 if self._valStart is None:
468 self._valStart = os.times()
469 msg.debug('valStart time is {0}'.format(self._valStart))
470
471 def preExecute(self, input = set(), output = set()):
472 self.setPreExeStart()
473 msg.info('Preexecute for %s', self._name)
474
475 def execute(self):
476 self._exeStart = os.times()
477 msg.debug('exeStart time is {0}'.format(self._exeStart))
478 msg.info('Starting execution of %s', self._name)
479 self._hasExecuted = True
480 self._rc = 0
481 self._errMsg = ''
482 msg.info('%s executor returns %d', self._name, self._rc)
483 self._exeStop = os.times()
484 msg.debug('preExeStop time is {0}'.format(self._exeStop))
485
486 def postExecute(self):
487 msg.info('Postexecute for %s', self._name)
488
489 def validate(self):
490 self.setValStart()
491 self._hasValidated = True
492 msg.info('Executor %s has no validation function - assuming all ok', self._name)
493 self._isValidated = True
494 self._errMsg = ''
495 self._valStop = os.times()
496 msg.debug('valStop time is {0}'.format(self._valStop))
497
498
499 def doAll(self, input=set(), output=set()):
500 self.preExecute(input, output)
501 self.execute()
502 self.postExecute()
503 self.validate()
504
505
507 def __init__(self, name = 'Logscan'):
508 super(logscanExecutor, self).__init__(name=name)
509 self._errorMaskFiles = None
510 self._logFileName = None
511
512 def preExecute(self, input = set(), output = set()):
513 self.setPreExeStart()
514 msg.info('Preexecute for %s', self._name)
515 if 'logfile' in self.conf.argdict:
516 self._logFileName = self.conf.argdict['logfile'].value
517
518 def validate(self):
519 self.setValStart()
520 msg.info("Starting validation for {0}".format(self._name))
521 if self._logFileName:
522
524 if 'ignorePatterns' in self.conf.argdict:
525 igPat = self.conf.argdict['ignorePatterns'].value
526 else:
527 igPat = []
528 if 'ignoreFiles' in self.conf.argdict:
529 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
530 elif self._errorMaskFiles is not None:
531 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
532 else:
533 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
534
535 # Now actually scan my logfile
536 msg.info('Scanning logfile {0} for errors'.format(self._logFileName))
537 self._logScan = trfValidation.athenaLogFileReport(logfile = self._logFileName, ignoreList = ignorePatterns)
538 worstError = self._logScan.worstError()
539
540 # In general we add the error message to the exit message, but if it's too long then don't do
541 # that and just say look in the jobReport
542 if worstError['firstError']:
543 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
544 if 'CoreDumpSvc' in worstError['firstError']['message']:
545 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
546 elif 'G4Exception' in worstError['firstError']['message']:
547 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
548 else:
549 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
550 else:
551 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
552 else:
553 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
554
555 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
556 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
557 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
559 self._isValidated = False
560 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
561 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
562 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
563
564 # Must be ok if we got here!
565 msg.info('Executor {0} has validated successfully'.format(self.name))
566 self._isValidated = True
567 self._errMsg = ''
568
569 self._valStop = os.times()
570 msg.debug('valStop time is {0}'.format(self._valStop))
571
572
574 def __init__(self, name = 'Echo', trf = None):
575
576 # We are only changing the default name here
577 super(echoExecutor, self).__init__(name=name, trf=trf)
578
579
580 def execute(self):
581 self._exeStart = os.times()
582 msg.debug('exeStart time is {0}'.format(self._exeStart))
583 msg.info('Starting execution of %s', self._name)
584 msg.info('Transform argument dictionary now follows:')
585 for k, v in self.conf.argdict.items():
586 print("%s = %s" % (k, v))
587 self._hasExecuted = True
588 self._rc = 0
589 self._errMsg = ''
590 msg.info('%s executor returns %d', self._name, self._rc)
591 self._exeStop = os.times()
592 msg.debug('exeStop time is {0}'.format(self._exeStop))
593
594
596 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
597
598 # We are only changing the default name here
599 super(dummyExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
600
601
602 def execute(self):
603 self._exeStart = os.times()
604 msg.debug('exeStart time is {0}'.format(self._exeStart))
605 msg.info('Starting execution of %s', self._name)
606 for type in self._outData:
607 for k, v in self.conf.argdict.items():
608 if type in k:
609 msg.info('Creating dummy output file: {0}'.format(self.conf.argdict[k].value[0]))
610 open(self.conf.argdict[k].value[0], 'a').close()
611 self._hasExecuted = True
612 self._rc = 0
613 self._errMsg = ''
614 msg.info('%s executor returns %d', self._name, self._rc)
615 self._exeStop = os.times()
616 msg.debug('exeStop time is {0}'.format(self._exeStop))
617
618
620 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData = set(),
621 exe = None, exeArgs = None, memMonitor = True):
622 # Name of the script we want to execute
623 self._exe = exe
624
625 # With arguments (currently this means paste in the corresponding _argdict entry)
626 self._exeArgs = exeArgs
627
628 super(scriptExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
629
630 self._extraMetadata.update({'script' : exe})
631
632 # Decide if we echo script output to stdout
633 self._echoOutput = False
634
635 # Can either be written by base class or child
636 self._cmd = None
637
638 self._memMonitor = memMonitor
639
640 @property
641 def exe(self):
642 return self._exe
643
644 @exe.setter
645 def exe(self, value):
646 self._exe = value
647 self._extraMetadata['script'] = value
648
649 @property
650 def exeArgs(self):
651 return self._exeArgs
652
653 @exeArgs.setter
654 def exeArgs(self, value):
655 self._exeArgs = value
656# self._extraMetadata['scriptArgs'] = value
657
658 def preExecute(self, input = set(), output = set()):
659 self.setPreExeStart()
660 msg.debug('scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
661
662 self._input = input
663 self._output = output
664
665
666 if self._cmd is None:
668 msg.info('Will execute script as %s', self._cmd)
669
670 # Define this here to have it for environment detection messages
671 self._logFileName = "log.{0}".format(self._name)
672
673
675 if 'TRF_ECHO' in os.environ:
676 msg.info('TRF_ECHO envvar is set - enabling command echoing to stdout')
677 self._echoOutput = True
678 elif 'TRF_NOECHO' in os.environ:
679 msg.info('TRF_NOECHO envvar is set - disabling command echoing to stdout')
680 self._echoOutput = False
681 # PS1 is for sh, bash; prompt is for tcsh and zsh
682 elif isInteractiveEnv():
683 msg.info('Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
684 self._echoOutput = True
685 elif 'TZHOME' in os.environ:
686 msg.info('Tier-0 environment detected - enabling command echoing to stdout')
687 self._echoOutput = True
688 if self._echoOutput is False:
689 msg.info('Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self._name, self._logFileName))
690
691 # Now setup special loggers for logging execution messages to stdout and file
692 self._echologger = logging.getLogger(self._name)
693 self._echologger.setLevel(logging.INFO)
694 self._echologger.propagate = False
695
696 encargs = {'encoding' : 'utf-8'}
697 self._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
698 self._exeLogFile.setFormatter(logging.Formatter('%(asctime)s %(message)s', datefmt='%H:%M:%S'))
699 self._echologger.addHandler(self._exeLogFile)
700
701 if self._echoOutput:
702 self._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
703 self._echostream.setFormatter(logging.Formatter('%(name)s %(asctime)s %(message)s', datefmt='%H:%M:%S'))
704 self._echologger.addHandler(self._echostream)
705
707 if self._exe:
708 self._cmd = [self.exe, ]
709 else:
710 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711 'No executor set in {0}'.format(self.__class__.__name__))
712 for arg in self.exeArgs:
713 if arg in self.conf.argdict:
714 # If we have a list then add each element to our list, else just str() the argument value
715 # Note if there are arguments which need more complex transformations then
716 # consider introducing a special toExeArg() method.
717 if isinstance(self.conf.argdict[arg].value, list):
718 self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719 else:
720 self._cmd.append(str(self.conf.argdict[arg].value))
721
722
723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735 # if we are already inside a container, then
736 # must map /srv of the nested container to /srv of the parent container:
737 # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742 # go back to the original working directory immediately after to store prmon in it
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755 # TODO - link mem.full.current to mem.full.SUBSTEP
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 line = p.stdout.readline()
762 if line:
763 self._echologger.info(line.rstrip())
764 # Hoover up remaining buffered output lines
765 for line in p.stdout:
766 self._echologger.info(line.rstrip())
767
768 self._rc = p.returncode
769 msg.info('%s executor returns %d', self._name, self._rc)
770 self._exeStop = os.times()
771 msg.debug('exeStop time is {0}'.format(self._exeStop))
772 except OSError as e:
773 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774 msg.error(errMsg)
775 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776 finally:
777 if self._memMonitor:
778 try:
779 mem_proc.send_signal(signal.SIGUSR1)
780 countWait = 0
781 while (not mem_proc.poll()) and countWait < 10:
782 time.sleep(0.1)
783 countWait += 1
784 except OSError:
785 pass
786
787
788 def postExecute(self):
789 if hasattr(self._exeLogFile, 'close'):
790 self._exeLogFile.close()
791 if self._memMonitor:
792 try:
793 memFile = open(self._memSummaryFile)
794 self._memStats = json.load(memFile)
795 except Exception as e:
796 msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e))
797 self._memMonitor = False
798 self._memStats = {}
799
800
801 def validate(self):
802 if self._valStart is None:
803 self._valStart = os.times()
804 msg.debug('valStart time is {0}'.format(self._valStart))
805 self._hasValidated = True
806
807
808 if self._rc == 0:
809 msg.info('Executor {0} validated successfully (return code {1})'.format(self._name, self._rc))
810 self._isValidated = True
811 self._errMsg = ''
812 else:
813 # Want to learn as much as possible from the non-zero code
814 # this is a bit hard in general, although one can do signals.
815 # Probably need to be more specific per exe, i.e., athena non-zero codes
816 self._isValidated = False
817 if self._rc < 0:
818 # Map return codes to what the shell gives (128 + SIGNUM)
819 self._rc = 128 - self._rc
820 if trfExit.codeToSignalname(self._rc) != "":
821 self._errMsg = '{0} got a {1} signal (exit code {2})'.format(self._name, trfExit.codeToSignalname(self._rc), self._rc)
822 else:
823 self._errMsg = 'Non-zero return code from %s (%d)' % (self._name, self._rc)
824 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_FAIL'), self._errMsg)
825
826
828 if 'checkEventCount' in self.conf.argdict and self.conf.argdict['checkEventCount'].returnMyValue(exe=self) is False:
829 msg.info('Event counting for substep {0} is skipped'.format(self.name))
830 else:
831 if 'mpi' in self.conf.argdict and not mpi.mpiShouldValidate():
832 msg.info('MPI mode -- skipping output event count check')
833 else:
834 checkcount=trfValidation.eventMatch(self)
835 checkcount.decide()
836 self._eventCount = checkcount.eventCount
837 msg.info('Event counting for substep {0} passed'.format(self.name))
838
839 self._valStop = os.times()
840 msg.debug('valStop time is {0}'.format(self._valStop))
841
842
843
845 _exitMessageLimit = 200 # Maximum error message length to report in the exitMsg
846 _defaultIgnorePatternFile = ['atlas_error_mask.db']
847
848
885 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
886 inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'],
887 substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {},
888 literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None,
889 manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
890
891 self._substep = forceToAlphaNum(substep)
892 self._inputEventTest = inputEventTest
893 self._tryDropAndReload = tryDropAndReload
894 self._extraRunargs = extraRunargs
895 self._runtimeRunargs = runtimeRunargs
896 self._literalRunargs = literalRunargs
897 self._dataArgs = dataArgs
898 self._errorMaskFiles = errorMaskFiles
899 self._inputDataTypeCountCheck = inputDataTypeCountCheck
900 self._disableMT = disableMT
901 self._disableMP = disableMP
902 self._onlyMP = onlyMP
903 self._onlyMT = onlyMT
904 self._onlyMPWithRunargs = onlyMPWithRunargs
905 self._skeletonCA=skeletonCA
906
907 if perfMonFile:
908 self._perfMonFile = None
909 msg.debug("Resource monitoring from PerfMon is now deprecated")
910
911 # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
912 if isinstance(skeletonFile, str):
913 self._skeleton = [skeletonFile]
914 else:
915 self._skeleton = skeletonFile
916
917 super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918 exeArgs=exeArgs, memMonitor=memMonitor)
919
920 # Add athena specific metadata
921 self._extraMetadata.update({'substep': substep})
922
923 # Setup JO templates
924 if self._skeleton or self._skeletonCA:
925 self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
926 else:
927 self._jobOptionsTemplate = None
928
929 @property
931 return self._inputDataTypeCountCheck
932
933 @inputDataTypeCountCheck.setter
934 def inputDataTypeCountCheck(self, value):
935 self._inputDataTypeCountCheck = value
936
937 @property
938 def substep(self):
939 return self._substep
940
941 @property
942 def disableMP(self):
943 return self._disableMP
944
945 @disableMP.setter
946 def disableMP(self, value):
947 self._disableMP = value
948
949 @property
950 def disableMT(self):
951 return self._disableMT
952
953 @disableMT.setter
954 def disableMT(self, value):
955 self._disableMT = value
956
957 @property
958 def onlyMP(self):
959 return self._onlyMP
960
961 @onlyMP.setter
962 def onlyMP(self, value):
963 self._onlyMP = value
964
965 @property
966 def onlyMT(self):
967 return self._onlyMT
968
969 @onlyMT.setter
970 def onlyMT(self, value):
971 self._onlyMT = value
972
973 def preExecute(self, input = set(), output = set()):
974 self.setPreExeStart()
975 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
976
977 # Check we actually have events to process!
978 inputEvents = 0
979 dt = ""
980 if self._inputDataTypeCountCheck is None:
981 self._inputDataTypeCountCheck = input
982 for dataType in self._inputDataTypeCountCheck:
983 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
984 continue
985
986 thisInputEvents = self.conf.dataDictionary[dataType].nentries
987 if thisInputEvents > inputEvents:
988 inputEvents = thisInputEvents
989 dt = dataType
990
991 # Now take into account skipEvents and maxEvents
992 if ('skipEvents' in self.conf.argdict and
993 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
994 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
995 else:
996 mySkipEvents = 0
997
998 if ('maxEvents' in self.conf.argdict and
999 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001 else:
1002 myMaxEvents = -1
1003
1004 # Any events to process...?
1005 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1006 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1007 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1008
1009 try:
1010 # Expected events to process
1011 if (myMaxEvents != -1):
1012 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1013 expectedEvents = myMaxEvents
1014 else:
1015 expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1016 else:
1017 expectedEvents = inputEvents-mySkipEvents
1018 except TypeError:
1019 # catching type error from UNDEFINED inputEvents count
1020 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1021 expectedEvents = 0
1022
1023
1026 OSSetupString = None
1027
1028 # Extract the asetup string
1029 asetupString = None
1030 legacyThreadingRelease = False
1031 if 'asetup' in self.conf.argdict:
1032 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1033 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1034 else:
1035 msg.info('Asetup report: {0}'.format(asetupReport()))
1036
1037 if asetupString is not None:
1038 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1039 currentOS = os.environ['ALRB_USER_PLATFORM']
1040 if legacyOSRelease and "centos7" not in currentOS:
1041 OSSetupString = "centos7"
1042 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1043
1044
1045 # allow overriding the container OS using a flag
1046 if 'runInContainer' in self.conf.argdict:
1047 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1048 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1049 if OSSetupString is not None and asetupString is None:
1050 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1051 '--asetup must be used for the substep which requires --runInContainer')
1052
1053 # Conditional MP based on runtime arguments
1054 if self._onlyMPWithRunargs:
1055 for k in self._onlyMPWithRunargs:
1056 if k in self.conf._argdict:
1057 self._onlyMP = True
1058
1059 # Check the consistency of parallel configuration: CLI flags + evnironment.
1060 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1061 ('ATHENA_CORE_NUMBER' not in os.environ)):
1062 # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1063 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1064 else:
1065 # Try to detect AthenaMT mode, number of threads and number of concurrent events
1066 if not self._disableMT:
1067 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1068
1069 # Try to detect AthenaMP mode and number of workers
1070 if not self._disableMP:
1071 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1072
1073 # Check that we actually support MT
1074 if self._onlyMP and self._athenaMT > 0:
1075 msg.info("This configuration does not support MT, falling back to MP")
1076 if self._athenaMP == 0:
1077 self._athenaMP = self._athenaMT
1078 self._athenaMT = 0
1080
1081 # Check that we actually support MP
1082 if self._onlyMT and self._athenaMP > 0:
1083 msg.info("This configuration does not support MP, using MT")
1084 if self._athenaMT == 0:
1085 self._athenaMT = self._athenaMP
1087 self._athenaMP = 0
1088
1089 # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1090 # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1091 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1092 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1093 self._disableMP = True
1094 self._athenaMP = 0
1095
1096 # Handle executor steps
1097 if self.conf.totalExecutorSteps > 1:
1098 for dataType in output:
1099 if self.conf._dataDictionary[dataType].originalName:
1100 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1101 else:
1102 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1103 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1104 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1105
1106 # And if this is (still) athenaMP, then set some options for workers and output file report
1107 if self._athenaMP > 0:
1108 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1109 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1110 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1111 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1113 else:
1114 self._athenaMPReadEventOrders = False
1115 # Decide on scheduling
1116 if ('athenaMPStrategy' in self.conf.argdict and
1117 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1118 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1119 else:
1120 self._athenaMPStrategy = 'SharedQueue'
1121
1122 # See if we have options for the target output file size
1123 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1124 for dataType in output:
1125 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1126 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1127 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1128 else:
1129 # Use a globbing strategy
1130 matchedViaGlob = False
1131 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1132 if fnmatch(dataType, mtsType):
1133 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1134 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135 matchedViaGlob = True
1136 break
1137 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1138 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1139 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1140
1141 # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1142 # so that the mother process output file (if it exists) can be used directly
1143 # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1144 for dataType in output:
1145 if self.conf.totalExecutorSteps <= 1:
1146 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1147 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1148 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1149 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1150 else:
1151 self.conf._dataDictionary[dataType].value[0] += "_000"
1152 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1153 else:
1155
1156
1157 if 'mpi' in self.conf.argdict:
1158 msg.info("Running in MPI mode")
1159 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1160
1161
1162 if self._skeleton or self._skeletonCA:
1163 inputFiles = dict()
1164 for dataType in input:
1165 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1166 outputFiles = dict()
1167 for dataType in output:
1168 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1169
1170 # See if we have any 'extra' file arguments
1171 nameForFiles = commonExecutorStepName(self._name)
1172 for dataType, dataArg in self.conf.dataDictionary.items():
1173 if isinstance(dataArg, list) and dataArg:
1174 if self.conf.totalExecutorSteps <= 1:
1175 raise ValueError('Multiple input arguments provided but only running one substep')
1176 if self.conf.totalExecutorSteps != len(dataArg):
1177 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1178
1179 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1180 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1181 else:
1182 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1183 inputFiles[dataArg.subtype] = dataArg
1184
1185 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1186
1187 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1188 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1189 output = outputFiles)
1190
1191
1193 if len(input) > 0:
1194 self._extraMetadata['inputs'] = list(input)
1195 if len(output) > 0:
1196 self._extraMetadata['outputs'] = list(output)
1197
1198
1199 dbrelease = dbsetup = None
1200 if 'DBRelease' in self.conf.argdict:
1201 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1202 if path.islink(dbrelease):
1203 dbrelease = path.realpath(dbrelease)
1204 if dbrelease:
1205 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1206 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1207 if dbdMatch:
1208 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1209 if not os.access(dbrelease, os.R_OK):
1210 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1211 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1212 dbrelease = dbdMatch.group(1)
1213 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1214 else:
1215 # Check if the DBRelease is setup
1216 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1217 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1218 if unpacked:
1219 # Now run the setup.py script to customise the paths to the current location...
1220 setupDBRelease(dbsetup)
1221 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1222 else:
1223 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1224
1225
1227 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1229
1230
1231 super(athenaExecutor, self).preExecute(input, output)
1232
1233 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1234 # This will have asetup and/or DB release setups in it
1235 # Do this last in this preExecute as the _cmd needs to be finalised
1236 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1237 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1238 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1239
1240
1241 def postExecute(self):
1242 super(athenaExecutor, self).postExecute()
1243 # MPI merging
1244 if 'mpi' in self.conf.argdict:
1245 mpi.mergeOutputs()
1246
1247 # Handle executor substeps
1248 if self.conf.totalExecutorSteps > 1:
1249 if self._athenaMP > 0:
1250 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1251 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1252 if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1253 # first loop over datasets for the output
1254 for dataType in self._output:
1255 newValue = []
1256 if self._athenaMP > 0:
1257 # assume the same number of workers all the time
1258 for i in range(self.conf.totalExecutorSteps):
1259 for v in self.conf.dataDictionary[dataType].value:
1260 newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1261 '_{0}{1}_'.format(executorStepSuffix, i)))
1262 else:
1263 self.conf.dataDictionary[dataType].multipleOK = True
1264 # just combine all executors
1265 for i in range(self.conf.totalExecutorSteps):
1266 newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1267 self.conf.dataDictionary[dataType].value = newValue
1268
1269 # do the merging if needed
1270 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1271 self._smartMerge(self.conf.dataDictionary[dataType])
1272
1273 # If this was an athenaMP run then we need to update output files
1274 elif self._athenaMP > 0:
1275 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1276
1277 skipFileChecks=False
1278 if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1279 skipFileChecks=True
1280 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1281 for dataType in self._output:
1282 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1283 self._smartMerge(self.conf.dataDictionary[dataType])
1284
1285 if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1286 self._targzipJiveXML()
1287
1288 # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1289 # This is a bit ugly to have such a specific feature here though
1290 # TODO
1291 # The best is to have a general approach so that user can extract useful info from log
1292 # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1293 # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1294 # and use it to extract required info and do the summation during log scan.
1295 if self._logFileName=='log.ReSim' and self.name=='ReSim':
1296 msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1297 self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1298
1299 # Remove intermediate input/output files of sub-steps
1300 # Delete only files with io="temporay" which are files with pattern "tmp*"
1301 # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1302 # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1303 # Enable if --deleteIntermediateOutputfiles is set
1304 if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1305 inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1306
1307 for k, v in inputDataDictionary.items():
1308 if not v.io == 'temporary':
1309 continue
1310 for filename in v.value:
1311 if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1312 msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1313 # Check if symbolic link and delete also linked file
1314 if (os.path.realpath(filename) != filename):
1315 targetpath = os.path.realpath(filename)
1316 os.unlink(filename)
1317 if (targetpath) and os.access(targetpath, os.R_OK):
1318 os.unlink(targetpath)
1319
1320
1321 def validate(self):
1322 self.setValStart()
1323 self._hasValidated = True
1324 deferredException = None
1325 memLeakThreshold = 5000
1326 _hasMemLeak = False
1327
1328
1329 try:
1330 super(athenaExecutor, self).validate()
1332 # In this case we hold this exception until the logfile has been scanned
1333 msg.error('Validation of return code failed: {0!s}'.format(e))
1334 deferredException = e
1335
1336
1344 msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1345 self._memLeakResult = analytic().getFittedData(self._memFullFile)
1346 if self._memLeakResult:
1347 if self._memLeakResult['slope'] > memLeakThreshold:
1348 _hasMemLeak = True
1349 msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1350 else:
1351 msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1352 else:
1353 msg.info('No memory monitor file to be analysed')
1354
1355 # Logfile scan setup
1356 # Always use ignorePatterns from the command line
1357 # For patterns in files, pefer the command line first, then any special settings for
1358 # this executor, then fallback to the standard default (atlas_error_mask.db)
1359 if 'ignorePatterns' in self.conf.argdict:
1360 igPat = self.conf.argdict['ignorePatterns'].value
1361 else:
1362 igPat = []
1363 if 'ignoreFiles' in self.conf.argdict:
1364 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1365 elif self._errorMaskFiles is not None:
1366 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1367 else:
1368 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1369
1370 # Now actually scan my logfile
1371 msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1373 ignoreList=ignorePatterns)
1374 worstError = self._logScan.worstError()
1375 eventLoopWarnings = self._logScan.eventLoopWarnings()
1377
1378
1379 # In general we add the error message to the exit message, but if it's too long then don't do
1380 # that and just say look in the jobReport
1381 if worstError['firstError']:
1382 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1383 if 'CoreDumpSvc' in worstError['firstError']['message']:
1384 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1385 elif 'G4Exception' in worstError['firstError']['message']:
1386 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1387 else:
1388 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1389 else:
1390 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1391 else:
1392 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1393
1394 # If we failed on the rc, then abort now
1395 if deferredException is not None:
1396 # Add any logfile information we have
1397 if worstError['nLevel'] >= stdLogLevels['ERROR']:
1398 deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1399 # Add the result of memory analysis
1400 if _hasMemLeak:
1401 deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1402 raise deferredException
1403
1404
1405 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1406 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1407 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1408 # Act as if ignoreErrors=True if running in MPI, because we want to be tolerant to the occasional event failure
1409 elif worstError['nLevel'] >= stdLogLevels['ERROR'] and (not mpi.mpiShouldValidate()):
1410 msg.warning(f'Found {worstError['level']} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1411 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1412 self._isValidated = False
1413 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1414 # Add the result of memory analysis
1415 if _hasMemLeak:
1416 exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1417 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1418 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1419
1420 # Print event loop warnings
1421 if (len(eventLoopWarnings) > 0):
1422 msg.warning('Found WARNINGS in the event loop, as follows:')
1423 for element in eventLoopWarnings:
1424 msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1425
1426 # Must be ok if we got here!
1427 msg.info('Executor {0} has validated successfully'.format(self.name))
1428 self._isValidated = True
1429
1430 self._valStop = os.times()
1431 msg.debug('valStop time is {0}'.format(self._valStop))
1432
1433
1434 def _isCAEnabled(self):
1435 # CA not present
1436 if 'CA' not in self.conf.argdict:
1437 # If there is no legacy skeleton, then we are running with CA
1438 if not self._skeleton:
1439 return True
1440 else:
1441 return False
1442
1443 # CA present but None, all substeps running with CA
1444 if self.conf.argdict['CA'] is None:
1445 return True
1446
1447 # CA enabled for a substep, running with CA
1448 if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1449 return True
1450
1451 return False
1452
1453
1455
1458 if 'athena' in self.conf.argdict:
1459 self._exe = self.conf.argdict['athena'].value
1460 self._cmd = [self._exe]
1461
1462 # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1463 currentSubstep = None
1464 if 'athenaopts' in self.conf.argdict:
1465 currentName = commonExecutorStepName(self.name)
1466 if currentName in self.conf.argdict['athenaopts'].value:
1467 currentSubstep = currentName
1468 if self.substep in self.conf.argdict['athenaopts'].value:
1469 msg.info('Athenaopts found for {0} and {1}, joining options. '
1470 'Consider changing your configuration to use just the name or the alias of the substep.'
1471 .format(currentSubstep, self.substep))
1472 self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1473 del self.conf.argdict['athenaopts'].value[self.substep]
1474 msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1475 elif self.substep in self.conf.argdict['athenaopts'].value:
1476 currentSubstep = self.substep
1477 elif 'all' in self.conf.argdict['athenaopts'].value:
1478 currentSubstep = 'all'
1479
1480 # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1481 preLoadUpdated = dict()
1482 if 'LD_PRELOAD' in self._envUpdate._envdict:
1483 preLoadUpdated[currentSubstep] = False
1484 if 'athenaopts' in self.conf.argdict:
1485 if currentSubstep is not None:
1486 for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1487 # This code is pretty ugly as the athenaopts argument contains
1488 # strings which are really key/value pairs
1489 if athArg.startswith('--preloadlib'):
1490 try:
1491 i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1492 v = athArg.split('=', 1)[1]
1493 msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1494 newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1495 self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1496 except Exception as e:
1497 msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1498 preLoadUpdated[currentSubstep] = True
1499 break
1500 if not preLoadUpdated[currentSubstep]:
1501 msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1502 if 'athenaopts' in self.conf.argdict:
1503 if currentSubstep is not None:
1504 self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1505 else:
1506 self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1507 else:
1508 self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1509
1510 # Now update command line with the options we have (including any changes to preload)
1511 if 'athenaopts' in self.conf.argdict:
1512 if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1513 self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1514 elif currentSubstep in self.conf.argdict['athenaopts'].value:
1515 self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1516
1517 if currentSubstep is None:
1518 currentSubstep = 'all'
1519
1520 if self._tryDropAndReload:
1521 if self._isCAEnabled():
1522 msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1523 elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1524 msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1525 elif 'athenaopts' in self.conf.argdict:
1526 athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1527 # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1528 if currentSubstep in self.conf.argdict['athenaopts'].value:
1529 conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1530 if len(conflictOpts) > 0:
1531 msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1532 else:
1533 msg.info('Appending "--drop-and-reload" to athena options')
1534 self._cmd.append('--drop-and-reload')
1535 else:
1536 msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1537 self._cmd.append('--drop-and-reload')
1538 else:
1539 # This is the 'standard' case - so drop and reload should be ok
1540 msg.info('Appending "--drop-and-reload" to athena options')
1541 self._cmd.append('--drop-and-reload')
1542 else:
1543 msg.info('Skipping test for "--drop-and-reload" in this executor')
1544
1545 if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1546 # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1547 if self._athenaMT > 0 and not self._disableMT:
1548 if not ('athenaopts' in self.conf.argdict and
1549 any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1550 self._cmd.append('--threads=%s' % str(self._athenaMT))
1551
1552 # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1553 if self._athenaMP > 0 and not self._disableMP:
1554 if not ('athenaopts' in self.conf.argdict and
1555 any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1556 self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1557
1558 # Add topoptions
1559 # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1560 if self._skeleton or self._skeletonCA:
1561 self._cmd += self._topOptionsFiles
1562 msg.info('Updated script arguments with topoptions: %s', self._cmd)
1563
1564
1565
1567 self,
1568 asetup = None,
1569 dbsetup = None,
1570 ossetup = None
1571 ):
1572 self._originalCmd = self._cmd
1573 self._asetup = asetup
1574 self._dbsetup = dbsetup
1575 self._containerSetup = ossetup
1576 self._workdir = os.getcwd()
1577 self._alreadyInContainer = self._workdir.startswith("/srv")
1578 self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1579 self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1580
1581 # Create a setupATLAS script
1582 setupATLAS = 'my_setupATLAS.sh'
1583 with open(setupATLAS, 'w') as f:
1584 print("#!/bin/bash", file=f)
1585 print("""
1586if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1587 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1588fi
1589source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1590 , file=f)
1591 os.chmod(setupATLAS, 0o755)
1592
1593 msg.debug(
1594 'Preparing wrapper file {wrapperFileName} with '
1595 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1596 wrapperFileName = self._wrapperFile,
1597 asetupStatus = self._asetup,
1598 dbsetupStatus = self._dbsetup
1599 )
1600 )
1601
1602 container_cmd = None
1603 try:
1604 with open(self._wrapperFile, 'w') as wrapper:
1605 print('#!/bin/sh', file=wrapper)
1606 if self._containerSetup is not None:
1607 container_cmd = [ os.path.abspath(setupATLAS),
1608 "-c",
1609 self._containerSetup,
1610 "--pwd",
1611 self._workdir,
1612 "-s",
1613 os.path.join('.', self._setupFile),
1614 "-r"]
1615 print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1616 print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1617 print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1618 file = wrapper)
1619 print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1620 file=wrapper)
1621 print('echo', file=wrapper)
1622
1623 if asetup:
1624 wfile = wrapper
1625 asetupFile = None
1626 # if the substep is executed within a container, setup athena with a separate script
1627 # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1628 if self._containerSetup is not None:
1629 asetupFile = open(self._setupFile, 'w')
1630 wfile = asetupFile
1631 print(f'source ./{setupATLAS} -q', file=wfile)
1632 print(f'asetup {asetup}', file=wfile)
1633 print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1634 if dbsetup:
1635 dbroot = path.dirname(dbsetup)
1636 dbversion = path.basename(dbroot)
1637 print("# DBRelease setup", file=wrapper)
1638 print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1639 print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1640 print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1641 print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1642 print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1643 print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1644 if self._disableMT:
1645 print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1646 if self._disableMP:
1647 print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1648 if self._envUpdate.len > 0:
1649 for envSetting in self._envUpdate.values:
1650 if not envSetting.startswith('LD_PRELOAD'):
1651 print("export", envSetting, file=wrapper)
1652 # If Valgrind is engaged, a serialised Athena configuration file
1653 # is generated for use with a subsequent run of Athena with
1654 # Valgrind.
1655 if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1656 msg.info('Valgrind engaged')
1657 # Define the file name of the serialised Athena
1658 # configuration.
1659 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1660 name = self._name
1661 )
1662 # Run Athena for generation of its serialised configuration.
1663 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1664 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1665 # Generate a Valgrind command, suppressing or ussing default
1666 # options as requested and extra options as requested.
1667 if 'valgrindDefaultOpts' in self.conf._argdict:
1668 defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1669 else:
1670 defaultOptions = True
1671 if 'valgrindExtraOpts' in self.conf._argdict:
1672 extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1673 else:
1674 extraOptionsList = None
1675 msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1676 msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1677 command = ValgrindCommand(
1678 defaultOptions = defaultOptions,
1679 extraOptionsList = extraOptionsList,
1680 AthenaSerialisedConfigurationFile = \
1681 AthenaSerialisedConfigurationFile
1682 )
1683 msg.debug("Valgrind command: {command}".format(command = command))
1684 print(command, file=wrapper)
1685 # If VTune is engaged, a serialised Athena configuration file
1686 # is generated for use with a subsequent run of Athena with
1687 # VTune.
1688 elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1689 msg.info('VTune engaged')
1690 # Define the file name of the serialised Athena
1691 # configuration.
1692 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1693 name = self._name
1694 )
1695 # Run Athena for generation of its serialised configuration.
1696 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1697 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1698 # Generate a VTune command, suppressing or ussing default
1699 # options as requested and extra options as requested.
1700 if 'vtuneDefaultOpts' in self.conf._argdict:
1701 defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1702 else:
1703 defaultOptions = True
1704 if 'vtuneExtraOpts' in self.conf._argdict:
1705 extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1706 else:
1707 extraOptionsList = None
1708
1709 # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1710 if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1711 AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1712 else:
1713 AthenaCommand = self._cmd
1714 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1715
1716 msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1717 msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1718 command = VTuneCommand(
1719 defaultOptions = defaultOptions,
1720 extraOptionsList = extraOptionsList,
1721 AthenaCommand = AthenaCommand
1722 )
1723 msg.debug("VTune command: {command}".format(command = command))
1724 print(command, file=wrapper)
1725 else:
1726 msg.info('Valgrind/VTune not engaged')
1727 # run Athena command
1728 print(' '.join(self._cmd), file=wrapper)
1729 os.chmod(self._wrapperFile, 0o755)
1730 except OSError as e:
1731 errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1732 fileName = self._wrapperFile,
1733 error = e
1734 )
1735 msg.error(errMsg)
1737 trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1738 errMsg
1739 )
1740 self._cmd = [ path.join('.', self._wrapperFile) ]
1741 if self._containerSetup is not None:
1742 asetupFile.close()
1743 self._cmd = container_cmd + self._cmd
1744
1745
1746
1748 def _smartMerge(self, fileArg):
1749
1750 if 'selfMerge' not in dir(fileArg):
1751 msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1752 return
1753
1754 if fileArg.mergeTargetSize == 0:
1755 msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1756 return
1757
1758
1759 mergeCandidates = [list()]
1760 currentMergeSize = 0
1761 for fname in fileArg.value:
1762 size = fileArg.getSingleMetadata(fname, 'file_size')
1763 if not isinstance(size, int):
1764 msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1765 return
1766 # if there is no file in the job, then we must add it
1767 if len(mergeCandidates[-1]) == 0:
1768 msg.debug('Adding file {0} to current empty merge list'.format(fname))
1769 mergeCandidates[-1].append(fname)
1770 currentMergeSize += size
1771 continue
1772 # see if adding this file gets us closer to the target size (but always add if target size is negative)
1773 if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1774 msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1775 mergeCandidates[-1].append(fname)
1776 currentMergeSize += size
1777 continue
1778 # close this merge list and start a new one
1779 msg.debug('Starting a new merge list with file {0}'.format(fname))
1780 mergeCandidates.append([fname])
1781 currentMergeSize = size
1782
1783 msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1784
1785 if len(mergeCandidates) == 1:
1786 # Merging to a single file, so use the original filename that the transform
1787 # was started with
1788 mergeNames = [fileArg.originalName]
1789 else:
1790 # Multiple merge targets, so we need a set of unique names
1791 counter = 0
1792 mergeNames = []
1793 for mergeGroup in mergeCandidates:
1794 # Note that the individual worker files get numbered with 3 digit padding,
1795 # so these non-padded merges should be fine
1796 mergeName = fileArg.originalName + '_{0}'.format(counter)
1797 while path.exists(mergeName):
1798 counter += 1
1799 mergeName = fileArg.originalName + '_{0}'.format(counter)
1800 mergeNames.append(mergeName)
1801 counter += 1
1802 # Now actually do the merges
1803 for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1804 msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1805 if len(mergeGroup) <= 1:
1806 msg.info('Skip merging for single file')
1807 else:
1808
1809 self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1810
1811
1813 #tgzipping JiveXML files
1814 targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1815 if os.path.exists(targetTGZName):
1816 os.remove(targetTGZName)
1817
1818 import tarfile
1819 fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1820
1821 # force gz compression
1822 tar = tarfile.open(targetTGZName, "w:gz")
1823 for fName in os.listdir('.'):
1824 matches = fNameRE.findall(fName)
1825 if len(matches) > 0:
1826 if fNameRE.findall(fName)[0] == fName:
1827 msg.info('adding %s to %s', fName, targetTGZName)
1828 tar.add(fName)
1829
1830 tar.close()
1831 msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1832
1833
1834
1836
1837 # Here we validate, but will suppress any errors
1838 def validate(self):
1839 self.setValStart()
1840 try:
1841 super(optionalAthenaExecutor, self).validate()
1843 # In this case we hold this exception until the logfile has been scanned
1844 msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1845 self._isValidated = False
1846 self._errMsg = e.errMsg
1847 self._rc = e.errCode
1848 self._valStop = os.times()
1849 msg.debug('valStop time is {0}'.format(self._valStop))
1850
1851
1853
1864 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1865 inData = set(), outData = set(), exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True,
1866 perfMonFile = None, tryDropAndReload = True, extraRunargs = {},
1867 manualDataDictionary = None, memMonitor = True):
1868
1869 super(POOLMergeExecutor, self).__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1870 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1871 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1872 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1873 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1874
1875 def preExecute(self, input = set(), output = set()):
1876 self.setPreExeStart()
1877 super(POOLMergeExecutor, self).preExecute(input=input, output=output)
1878
1879
1880 def execute(self):
1881 # First call the parent executor, which will manage the athena execution for us
1882 super(POOLMergeExecutor, self).execute()
1883
1884
1885
1888
1890 def preExecute(self, input=set(), output=set()):
1891 self.setPreExeStart()
1892 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1893 if 'NTUP_PILEUP' not in output:
1894 # New derivation framework transform uses "formats"
1895 if 'formats' not in self.conf.argdict:
1896 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1897 'No derivation configuration specified')
1898
1899 if ('DAOD' not in output) and ('D2AOD' not in output):
1900 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1901 'No base name for DAOD output')
1902
1903 formatList = []
1904 if 'formats' in self.conf.argdict: formatList = self.conf.argdict['formats'].value
1905 for reduction in formatList:
1906 if ('DAOD' in output):
1907 dataType = 'DAOD_' + reduction
1908 if 'augmentations' not in self.conf.argdict:
1909 outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1910 else:
1911 for val in self.conf.argdict['augmentations'].value:
1912 if reduction in val.split(':')[0]:
1913 outputName = 'DAOD_' + val.split(':')[1] + '.' + self.conf.argdict['outputDAODFile'].value[0]
1914 break
1915 else:
1916 outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1917
1918 if ('D2AOD' in output):
1919 dataType = 'D2AOD_' + reduction
1920 outputName = 'D2AOD_' + reduction + '.' + self.conf.argdict['outputD2AODFile'].value[0]
1921
1922 msg.info('Adding reduction output type {0}'.format(dataType))
1923 output.add(dataType)
1924 newReduction = trfArgClasses.argPOOLFile(outputName, io='output', runarg=True, type='AOD',
1925 name=reduction)
1926 # References to _trf - can this be removed?
1927 self.conf.dataDictionary[dataType] = newReduction
1928
1929 # Clean up the stub file from the executor input and the transform's data dictionary
1930 # (we don't remove the actual argFile instance)
1931 if ('DAOD' in output):
1932 output.remove('DAOD')
1933 del self.conf.dataDictionary['DAOD']
1934 del self.conf.argdict['outputDAODFile']
1935 if ('D2AOD' in output):
1936 output.remove('D2AOD')
1937 del self.conf.dataDictionary['D2AOD']
1938 del self.conf.argdict['outputD2AODFile']
1939
1940 msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1941 msg.info('Input/Output: {0}/{1}'.format(input, output))
1942
1943 msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1944 msg.info('Input/Output: {0}/{1}'.format(input, output))
1945 super(reductionFrameworkExecutor, self).preExecute(input, output)
1946
1947
1948
1950 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']),
1951 exe='DQHistogramMerge.py', exeArgs = [], memMonitor = True):
1952
1953 self._histMergeList = 'HISTMergeList.txt'
1954
1955 super(DQMergeExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1956 exeArgs=exeArgs, memMonitor=memMonitor)
1957
1958
1959 def preExecute(self, input = set(), output = set()):
1960 self.setPreExeStart()
1961 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1962
1963 super(DQMergeExecutor, self).preExecute(input=input, output=output)
1964
1965 # Write the list of files to be merged
1966 with open(self._histMergeList, 'w') as DQMergeFile:
1967 for dataType in input:
1968 for fname in self.conf.dataDictionary[dataType].value:
1969 self.conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1970 print(fname, file=DQMergeFile)
1971
1972 self._cmd.append(self._histMergeList)
1973
1974 # Add the output file
1975 if len(output) != 1:
1976 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1977 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
1978 outDataType = list(output)[0]
1979 self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
1980
1981 # Set the run_post_processing to True/False
1982 if (self.conf._argdict.get("run_post_processing",False)):
1983 self._cmd.append('True')
1984 else:
1985 self._cmd.append('False')
1986
1987 if (self.conf._argdict.get("is_incremental_merge",False)):
1988 self._cmd.append('True')
1989 else:
1990 self._cmd.append('False')
1991
1992 for k in ("excludeHist","excludeDir"):
1993 if k in self.conf._argdict:
1994 self._cmd.append("--{0}={1}".format(k,self.conf._argdict[k]))
1995
1996
1997 def validate(self):
1998 self.setValStart()
1999 super(DQMergeExecutor, self).validate()
2000
2001 exitErrorMessage = ''
2002 # Base class validation successful, Now scan the logfile for missed errors.
2003 try:
2005 worstError = logScan.worstError()
2006
2007 # In general we add the error message to the exit message, but if it's too long then don't do
2008 # that and just say look in the jobReport
2009 if worstError['firstError']:
2010 if len(worstError['firstError']['message']) > logScan._msgLimit:
2011 exitErrorMessage = "Long {0} message at line {1}" \
2012 " (see jobReport for further details)".format(worstError['level'],
2013 worstError['firstError']['firstLine'])
2014 else:
2015 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2016 worstError['firstError']['message'])
2017 except OSError as e:
2018 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2020 'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2021
2022 if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2023 'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2024 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2025
2026 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2027 self._isValidated = False
2028 msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2029 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2030 raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2031
2032 # Must be ok if we got here!
2033 msg.info('Executor {0} has validated successfully'.format(self.name))
2034 self._isValidated = True
2035
2036 self._valStop = os.times()
2037 msg.debug('valStop time is {0}'.format(self._valStop))
2038
2039
2041 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']),
2042 exe='DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor = True):
2043
2044 self._histMergeList = 'HISTMergeList.txt'
2045
2046 super(DQMPostProcessExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2047 exeArgs=exeArgs, memMonitor=memMonitor)
2048
2049
2050 def preExecute(self, input = set(), output = set()):
2051 self.setPreExeStart()
2052 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2053
2054 super(DQMPostProcessExecutor, self).preExecute(input=input, output=output)
2055
2056 #build input file list (typically only one):
2057 dsName=self.conf.argdict["inputHISTFile"].dataset
2058 inputList=[]
2059 for dataType in input:
2060 for fname in self.conf.dataDictionary[dataType].value:
2061 #if no dataset name is give, guess it from file name
2062 if not dsName: dsName=".".join(fname.split('.')[0:4])
2063 inputList.append("#".join([dsName,fname]))
2064
2065
2066 if len(output) != 1:
2067 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2068 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2069 outDataType = list(output)[0]
2070 #build argument json:
2071 #T0 uses keys: 'allowCOOLUpload', 'doWebDisplay', 'incrementalMode', 'inputHistFiles', 'mergeParams', 'outputHistFile', 'postProcessing', 'reverseFileOrder', 'servers'
2072 #more keys: runNumber, streamName,projectTag,filepaths,productionMode,skipMerge
2073 wrapperParams={"inputHistFiles" : inputList,
2074 "outputHistFile" : dsName+"#"+self.conf.dataDictionary[outDataType].value[0],
2075 "incrementalMode": "True" if self.conf._argdict.get("is_incremental_merge",False) else "False",
2076 "postProcessing" : "True" if self.conf._argdict.get("run_post_processing",False) else "False",
2077 "doWebDisplay" : "True" if self.conf._argdict.get("doWebDisplay",False) else "False",
2078 "allowCOOLUpload": "True" if self.conf._argdict.get("allowCOOLUpload",False) else "False",
2079 "mergeParams" : "",
2080
2081 }
2082 if "servers" in self.conf._argdict:
2083 wrapperParams["server"]=self.conf._argdict["servers"]
2084
2085 for k in ("excludeHist","excludeDir"):
2086 if k in self.conf._argdict:
2087 wrapperParams["mergeParams"]+=(" --{0}={1}".format(k,self.conf._argdict[k]))
2088
2089
2090 with open("args.json", "w") as f:
2091 json.dump(wrapperParams, f)
2092
2093 self._cmd.append("--argJSON=args.json")
2094
2095
2096
2097 def validate(self):
2098 self.setValStart()
2099 super(DQMPostProcessExecutor, self).validate()
2100
2101 exitErrorMessage = ''
2102 # Base class validation successful, Now scan the logfile for missed errors.
2103 try:
2105 worstError = logScan.worstError()
2106
2107 # In general we add the error message to the exit message, but if it's too long then don't do
2108 # that and just say look in the jobReport
2109 if worstError['firstError']:
2110 if len(worstError['firstError']['message']) > logScan._msgLimit:
2111 exitErrorMessage = "Long {0} message at line {1}" \
2112 " (see jobReport for further details)".format(worstError['level'],
2113 worstError['firstError']['firstLine'])
2114 else:
2115 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2116 worstError['firstError']['message'])
2117 except OSError as e:
2118 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2120 'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2121
2122 if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2123 'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2124 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2125
2126 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2127 self._isValidated = False
2128 msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2129 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2130 raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2131
2132 # Must be ok if we got here!
2133 msg.info('Executor {0} has validated successfully'.format(self.name))
2134 self._isValidated = True
2135
2136 self._valStop = os.times()
2137 msg.debug('valStop time is {0}'.format(self._valStop))
2138
2140
2141 def preExecute(self, input = set(), output = set()):
2142 self.setPreExeStart()
2143 msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2144
2145 # Basic command, and allow overwrite of the output file
2146 if self._exe is None:
2147 self._exe = 'hadd'
2148 self._cmd = [self._exe, "-f"]
2149
2150
2151 # Add the output file
2152 if len(output) != 1:
2153 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2154 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2155 outDataType = list(output)[0]
2156 self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
2157 # Add to be merged to the cmd chain
2158 for dataType in input:
2159 self._cmd.extend(self.conf.dataDictionary[dataType].value)
2160
2161 super(NTUPMergeExecutor, self).preExecute(input=input, output=output)
2162
2163
2165 """Executor for running physvalPostProcessing.py with <input> <output> args"""
2166
2167 def preExecute(self, input = set(), output = set()):
2168 self.setPreExeStart()
2169 msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2170
2171 self._cmd = [self.exe, ]
2172
2173 if len(input) != 1 or len(output) != 1:
2174 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2175 f'Exactly one input and one output must be specified (got inputs={len(input)}, outputs={len(output)})')
2176
2177 self._cmd.append(self.conf.dataDictionary[list(input)[0]].value[0])
2178 self._cmd.append(self.conf.dataDictionary[list(output)[0]].value[0])
2179
2180 # Finalize execution setup by calling the parent class method
2181 super(NtupPhysValPostProcessingExecutor, self).preExecute(input=input, output=output)
2182
2183
2185
2186 def preExecute(self, input = set(), output = set()):
2187 self.setPreExeStart()
2188 self._inputBS = list(input)[0]
2189 self._outputBS = list(output)[0]
2191 self._useStubFile = False
2192 if 'maskEmptyInputs' in self.conf.argdict and self.conf.argdict['maskEmptyInputs'].value is True:
2193 eventfullFiles = []
2194 for fname in self.conf.dataDictionary[self._inputBS].value:
2195 nEvents = self.conf.dataDictionary[self._inputBS].getSingleMetadata(fname, 'nentries')
2196 msg.debug('Found {0} events in file {1}'.format(nEvents, fname))
2197 if isinstance(nEvents, int) and nEvents > 0:
2198 eventfullFiles.append(fname)
2199 self._maskedFiles = list(set(self.conf.dataDictionary[self._inputBS].value) - set(eventfullFiles))
2200 if len(self._maskedFiles) > 0:
2201 msg.info('The following input files are masked because they have 0 events: {0}'.format(' '.join(self._maskedFiles)))
2202 if len(eventfullFiles) == 0:
2203 if 'emptyStubFile' in self.conf.argdict and path.exists(self.conf.argdict['emptyStubFile'].value):
2204 self._useStubFile = True
2205 msg.info("All input files are empty - will use stub file {0} as output".format(self.conf.argdict['emptyStubFile'].value))
2206 else:
2207 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2208 'All input files had zero events - aborting BS merge')
2209
2210 # Write the list of input files to a text file, so that testMergedFiles can swallow it
2211 self._mergeBSFileList = '{0}.list'.format(self._name)
2212 self._mergeBSLogfile = '{0}.out'.format(self._name)
2213 try:
2214 with open(self._mergeBSFileList, 'w') as BSFileList:
2215 for fname in self.conf.dataDictionary[self._inputBS].value:
2216 if fname not in self._maskedFiles:
2217 print(fname, file=BSFileList)
2218 except OSError as e:
2219 errMsg = 'Got an error when writing list of BS files to {0}: {1}'.format(self._mergeBSFileList, e)
2220 msg.error(errMsg)
2221 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'), errMsg)
2222
2223 # Hope that we were given a correct filename...
2224 self._outputFilename = self.conf.dataDictionary[self._outputBS].value[0]
2225 if self._outputFilename.endswith('._0001.data'):
2226 self._doRename = False
2227 self._outputFilename = self._outputFilename.split('._0001.data')[0]
2228 elif self.conf.argdict['allowRename'].value is True:
2229 # OK, non-fatal, we go for a renaming
2230 msg.info('Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2231 self._doRename = True
2232 else:
2233 # No rename allowed, so we are dead...
2234 errmsg = 'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2235 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'), errmsg)
2236
2237 # Set the correct command for execution
2238 self._cmd = [self._exe, self._mergeBSFileList, '0', self._outputFilename]
2239
2240 super(bsMergeExecutor, self).preExecute(input=input, output=output)
2241
2242 def execute(self):
2243 if self._useStubFile:
2244 # Need to fake execution!
2245 self._exeStart = os.times()
2246 msg.debug('exeStart time is {0}'.format(self._exeStart))
2247 msg.info("Using stub file for empty BS output - execution is fake")
2248 if self._outputFilename != self.conf.argdict['emptyStubFile'].value:
2249 os.rename(self.conf.argdict['emptyStubFile'].value, self._outputFilename)
2250 self._memMonitor = False
2251 self._hasExecuted = True
2252 self._rc = 0
2253 self._exeStop = os.times()
2254 msg.debug('exeStop time is {0}'.format(self._exeStop))
2255 else:
2256 super(bsMergeExecutor, self).execute()
2257
2258 def postExecute(self):
2259 if self._useStubFile:
2260 pass
2261 elif self._doRename:
2262 self._expectedOutput = self._outputFilename + '._0001.data'
2263 msg.info('Renaming {0} to {1}'.format(self._expectedOutput, self.conf.dataDictionary[self._outputBS].value[0]))
2264 try:
2265 os.rename(self._outputFilename + '._0001.data', self.conf.dataDictionary[self._outputBS].value[0])
2266 except OSError as e:
2267 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
2268 'Exception raised when renaming {0} to {1}: {2}'.format(self._outputFilename, self.conf.dataDictionary[self._outputBS].value[0], e))
2269 super(bsMergeExecutor, self).postExecute()
2270
2271
2272
2274
2275 def preExecute(self, input = set(), output = set()):
2276 self.setPreExeStart()
2277 self._memMonitor = False
2278
2279 #archiving
2280 if self._exe == 'zip':
2281 if 'outputArchFile' not in self.conf.argdict:
2282 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
2283
2284 self._cmd = ['python']
2285 try:
2286 with open('zip_wrapper.py', 'w') as zip_wrapper:
2287 print("import zipfile, os, shutil", file=zip_wrapper)
2288 if os.path.exists(self.conf.argdict['outputArchFile'].value[0]):
2289 #appending input file(s) to existing archive
2290 print("zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2291 else:
2292 #creating new archive
2293 print("zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2294 print("for f in {}:".format(self.conf.argdict['inputDataFile'].value), file=zip_wrapper)
2295 #This module gives false positives (as of python 3.7.0). Will also check the name for ".zip"
2296 #print >> zip_wrapper, " if zipfile.is_zipfile(f):"
2297 print(" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2298 print(" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2299 print(" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2300 print(" archive.extractall('tmp')", file=zip_wrapper)
2301 print(" archive.close()", file=zip_wrapper)
2302 # remove stuff as soon as it is saved to output in order to save disk space at worker node
2303 print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2304 print(" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2305 print(" os.unlink(f)", file=zip_wrapper)
2306 print(" if os.path.isdir('tmp'):", file=zip_wrapper)
2307 print(" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2308 print(" for name in files:", file=zip_wrapper)
2309 print(" print 'Zipping {}'.format(name)", file=zip_wrapper)
2310 print(" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2311 print(" shutil.rmtree('tmp')", file=zip_wrapper)
2312 print(" else:", file=zip_wrapper)
2313 print(" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2314 print(" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2315 print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2316 print(" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2317 print(" os.unlink(f)", file=zip_wrapper)
2318 print("zf.close()", file=zip_wrapper)
2319 os.chmod('zip_wrapper.py', 0o755)
2320 except OSError as e:
2321 errMsg = 'error writing zip wrapper {fileName}: {error}'.format(fileName = 'zip_wrapper.py',
2322 error = e
2323 )
2324 msg.error(errMsg)
2325 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2326 errMsg
2327 )
2328 self._cmd.append('zip_wrapper.py')
2329
2330 #unarchiving
2331 elif self._exe == 'unarchive':
2332 import zipfile
2333 for infile in self.conf.argdict['inputArchFile'].value:
2334 if not zipfile.is_zipfile(infile):
2335 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2336 'An input file is not a zip archive - aborting unpacking')
2337 self._cmd = ['python']
2338 try:
2339 with open('unarchive_wrapper.py', 'w') as unarchive_wrapper:
2340 print("import zipfile", file=unarchive_wrapper)
2341 print("for f in {}:".format(self.conf.argdict['inputArchFile'].value), file=unarchive_wrapper)
2342 print(" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2343 print(" path = '{}'".format(self.conf.argdict['path']), file=unarchive_wrapper)
2344 print(" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2345 print(" archive.extractall(path)", file=unarchive_wrapper)
2346 print(" archive.close()", file=unarchive_wrapper)
2347 os.chmod('unarchive_wrapper.py', 0o755)
2348 except OSError as e:
2349 errMsg = 'error writing unarchive wrapper {fileName}: {error}'.format(fileName = 'unarchive_wrapper.py',
2350 error = e
2351 )
2352 msg.error(errMsg)
2353 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2354 errMsg
2355 )
2356 self._cmd.append('unarchive_wrapper.py')
2357 super(archiveExecutor, self).preExecute(input=input, output=output)
2358
static Double_t rc
void print(char *figname, TCanvas *c1)
#define min(a, b)
Definition cfImp.cxx:40
Argument class for substep lists, suitable for preExec/postExec.
Class holding the update to an environment that will be passed on to an executor.
Definition trfEnv.py:15
Base class for execution exceptions.
Exception class for validation failures detected by parsing logfiles.
Specialist execution class for DQM post-processing of histograms.
Definition trfExe.py:2040
__init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
Definition trfExe.py:2042
preExecute(self, input=set(), output=set())
Definition trfExe.py:2050
Specialist execution class for merging DQ histograms.
Definition trfExe.py:1949
__init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
Definition trfExe.py:1951
preExecute(self, input=set(), output=set())
Definition trfExe.py:1959
Specialist execution class for merging NTUPLE files.
Definition trfExe.py:2139
preExecute(self, input=set(), output=set())
Definition trfExe.py:2141
Specialist execution class for running post processing on merged PHYVAL NTUPLE file.
Definition trfExe.py:2164
preExecute(self, input=set(), output=set())
Definition trfExe.py:2167
preExecute(self, input=set(), output=set())
Definition trfExe.py:1875
__init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
Definition trfExe.py:1867
Archive transform.
Definition trfExe.py:2273
preExecute(self, input=set(), output=set())
Definition trfExe.py:2275
_writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
Definition trfExe.py:1571
_smartMerge(self, fileArg)
Manage smart merging of output files.
Definition trfExe.py:1748
preExecute(self, input=set(), output=set())
Definition trfExe.py:973
__init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
Definition trfExe.py:889
_prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
Definition trfExe.py:1454
_isCAEnabled(self)
Check if running with CA.
Definition trfExe.py:1434
_skeletonCA
Handle MPI setup.
Definition trfExe.py:905
_tryDropAndReload
Add –drop-and-reload if possible (and allowed!)
Definition trfExe.py:893
_envUpdate
Look for environment updates and perpare the athena command line.
Definition trfExe.py:1226
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
Definition trfExe.py:2184
preExecute(self, input=set(), output=set())
Definition trfExe.py:2186
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Definition trfExe.py:596
__init__(self, name='Echo', trf=None)
Definition trfExe.py:574
setFromTransform(self, trf)
Set configuration properties from the parent transform.
Definition trfExe.py:113
addToArgdict(self, key, value)
Add a new object to the argdict.
Definition trfExe.py:118
addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
Definition trfExe.py:122
__init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Definition trfExe.py:63
Special executor that will enable a logfile scan as part of its validation.
Definition trfExe.py:506
__init__(self, name='Logscan')
Definition trfExe.py:507
preExecute(self, input=set(), output=set())
Definition trfExe.py:512
Athena executor where failure is not consisered fatal.
Definition trfExe.py:1835
Specialist executor to manage the handling of multiple implicit input and output files within the der...
Definition trfExe.py:1887
preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
Definition trfExe.py:1890
preExecute(self, input=set(), output=set())
Definition trfExe.py:658
__init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Definition trfExe.py:621
Executors always only even execute a single step, as seen by the transform.
Definition trfExe.py:127
myMerger(self)
Now define properties for these data members.
Definition trfExe.py:207
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition trfExe.py:138
conf
Executor configuration:
Definition trfExe.py:163
preExecute(self, input=set(), output=set())
Definition trfExe.py:471
doAll(self, input=set(), output=set())
Convenience function.
Definition trfExe.py:499
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Small class used for vailiadating event counts between input and output files.
Class of patterns that can be ignored from athena logfiles.
STL class.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Transform argument class definitions.
Module for transform exit codes.
Contains functions related Athena Job Options files.
Logging configuration for ATLAS job transforms.
Utilities for handling MPI-Athena jobs.
Utilities for handling AthenaMP jobs.
Utilities for handling AthenaMT jobs.
Transform utility functions.
Validation control for job transforms.
Definition index.py:1
_encoding_stream(s)
Definition trfExe.py:45