ATLAS Offline Software
Loading...
Searching...
No Matches
trfExe.py
Go to the documentation of this file.
1# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
2
8
9import json
10import math
11import os
12import os.path as path
13import re
14import signal
15import subprocess
16import sys
17import time
18
19import logging
20from fnmatch import fnmatch
21msg = logging.getLogger(__name__)
22
23from PyJobTransforms.trfJobOptions import JobOptionsTemplate
24from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
28from PyJobTransforms.trfExitCodes import trfExit
29from PyJobTransforms.trfLogger import stdLogLevels
30from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
31from PyJobTransforms.trfMTTools import detectAthenaMTThreads
32
33import PyJobTransforms.trfExceptions as trfExceptions
34import PyJobTransforms.trfValidation as trfValidation
35import PyJobTransforms.trfArgClasses as trfArgClasses
36import PyJobTransforms.trfEnv as trfEnv
38
39
40# Depending on the setting of LANG, sys.stdout may end up with ascii or ansi
41# encoding, rather than utf-8. But Athena uses unicode for some log messages
42# (another example of Gell-Mann's totalitarian principle) and that will result
43# in a crash with python 3. In such a case, force the use of a utf-8 encoded
44# output stream instead.
46 enc = s.encoding.lower()
47 if enc.find('ascii') >= 0 or enc.find('ansi') >= 0:
48 return open (s.fileno(), 'w', encoding='utf-8')
49 return s
50
51
52
57class executorConfig(object):
58
59
63 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
64 self._argdict = argdict
65 self._dataDictionary = dataDictionary
66 self._firstExecutor = firstExecutor
67 self._executorStep = -1
69
70 @property
71 def argdict(self):
72 return self._argdict
73
74 @argdict.setter
75 def argdict(self, value):
76 self._argdict = value
77
78 @property
79 def dataDictionary(self):
80 return self._dataDictionary
81
82 @dataDictionary.setter
83 def dataDictionary(self, value):
84 self._dataDictionary = value
85
86 @property
87 def firstExecutor(self):
88 return self._firstExecutor
89
90 @firstExecutor.setter
91 def firstExecutor(self, value):
92 self._firstExecutor = value
93
94 @property
95 def executorStep(self):
96 return self._executorStep
97
98 @executorStep.setter
99 def executorStep(self, value):
100 self._executorStep = value
101
102 @property
104 return self._totalExecutorSteps
105
106 @totalExecutorSteps.setter
107 def totalExecutorSteps(self, value):
108 self._totalExecutorSteps = value
109
110
113 def setFromTransform(self, trf):
114 self._argdict = trf.argdict
115 self._dataDictionary = trf.dataDictionary
116
117
118 def addToArgdict(self, key, value):
119 self._argdict[key] = value
120
121
122 def addToDataDictionary(self, key, value):
123 self._dataDictionary[key] = value
124
125
126
127class transformExecutor(object):
128
129
138 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
139 # Some information to produce helpful log messages
140
141 self._name = forceToAlphaNum(name)
142 # Data this executor can start from and produce
143 # Note we transform NULL to inNULL and outNULL as a convenience
144 self._inData = set(inData)
145 self._outData = set(outData)
146 if 'NULL' in self._inData:
147 self._inData.remove('NULL')
148 self._inData.add('inNULL')
149 if 'NULL' in self._outData:
150 self._outData.remove('NULL')
151 self._outData.add('outNULL')
152
153 # It's forbidden for an executor to consume and produce the same datatype
154 dataOverlap = self._inData & self._outData
155 if len(dataOverlap) > 0:
156 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
157 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self._name, ' '.join(dataOverlap)))
158
159
162 if conf is not None:
163 self.conf = conf
164 else:
165 self.conf = executorConfig()
166 if trf is not None:
167 self.conf.setFromTransform(trf)
168
169 # Execution status
170 self._hasExecuted = False
171 self._rc = -1
172 self._errMsg = None
173
174 # Validation status
175 self._hasValidated = False
176 self._isValidated = False
177
178 # Extra metadata
179 # This dictionary holds extra metadata for this executor which will be
180 # provided in job reports
182
183
186 self._preExeStart = None
187 self._exeStart = self._exeStop = None
188 self._valStart = self._valStop = None
189 self._memStats = {}
191 self._memFullFile = None
192 self._eventCount = None
193 self._athenaMP = 0
194 self._athenaMT = 0
196 self._dbMonitor = None
197 self._resimevents = None
199 self._containerSetup = None
200
201 # Holder for execution information about any merges done by this executor in MP mode
202 self._myMerger = []
203
204
205
206 @property
207 def myMerger(self):
208 return self._myMerger
209
210 @property
211 def name(self):
212 return self._name
213
214 @name.setter
215 def name(self, value):
216 self._name = value
217
218 @property
219 def substep(self):
220 if '_substep' in dir(self):
221 return self._substep
222 return None
223
224 @property
225 def trf(self):
226 if '_trf' in dir(self):
227 return self._trf
228 return None
229
230 @trf.setter
231 def trf(self, value):
232 self._trf = value
233
234 @property
235 def inData(self):
236
237 if '_inData' in dir(self):
238 return self._inData
239 return None
240
241 @inData.setter
242 def inData(self, value):
243 self._inData = set(value)
244
245 def inDataUpdate(self, value):
246
247 if '_inData' in dir(self):
248 self._inData.update(value)
249 else:
250
251 self.inData = value
252
253
254 @property
255 def outData(self):
256
257 if '_outData' in dir(self):
258 return self._outData
259 return None
260
261 @outData.setter
262 def outData(self, value):
263 self._outData = set(value)
264
265 def outDataUpdate(self, value):
266
267 if '_outData' in dir(self):
268 self._outData.update(value)
269 else:
270
271 self.outData = value
272
273 @property
274
276 def input(self):
277
278 if '_input' in dir(self):
279 return self._input
280 return None
281
282 @property
283
285 def output(self):
286
287 if '_output' in dir(self):
288 return self._output
289 return None
290
291 @property
292 def extraMetadata(self):
293 return self._extraMetadata
294
295 @property
296 def hasExecuted(self):
297 return self._hasExecuted
298
299 @property
300 def rc(self):
301 return self._rc
302
303 @property
304 def errMsg(self):
305 return self._errMsg
306
307 @property
308 def validation(self):
309 return self._validation
310
311 @validation.setter
312 def validation(self, value):
313 self._validation = value
314
315 @property
316 def hasValidated(self):
317 return self._hasValidated
318
319 @property
320 def isValidated(self):
321 return self._isValidated
322
323
324 @property
325 def first(self):
326 if hasattr(self, '_first'):
327 return self._first
328 else:
329 return None
330
331 @property
333 return self._preExeStart
334
335 @property
336 def exeStartTimes(self):
337 return self._exeStart
338
339 @property
340 def exeStopTimes(self):
341 return self._exeStop
342
343 @property
344 def valStartTimes(self):
345 return self._valStart
346
347 @property
348 def valStopTimes(self):
349 return self._valStop
350
351 @property
352 def preExeCpuTime(self):
353 if self._preExeStart and self._exeStart:
354 return calcCpuTime(self._preExeStart, self._exeStart)
355 else:
356 return None
357
358 @property
359 def preExeWallTime(self):
360 if self._preExeStart and self._exeStart:
361 return calcWallTime(self._preExeStart, self._exeStart)
362 else:
363 return None
364
365 @property
366 def cpuTime(self):
367 if self._exeStart and self._exeStop:
368 return calcCpuTime(self._exeStart, self._exeStop)
369 else:
370 return None
371
372 @property
373 def usrTime(self):
374 if self._exeStart and self._exeStop:
375 return self._exeStop[2] - self._exeStart[2]
376 else:
377 return None
378
379 @property
380 def sysTime(self):
381 if self._exeStart and self._exeStop:
382 return self._exeStop[3] - self._exeStart[3]
383 else:
384 return None
385
386 @property
387 def wallTime(self):
388 if self._exeStart and self._exeStop:
389 return calcWallTime(self._exeStart, self._exeStop)
390 else:
391 return None
392
393 @property
394 def memStats(self):
395 return self._memStats
396
397 @property
398 def memAnalysis(self):
399 return self._memLeakResult
400
401 @property
402 def postExeCpuTime(self):
403 if self._exeStop and self._valStart:
404 return calcCpuTime(self._exeStop, self._valStart)
405 else:
406 return None
407
408 @property
410 if self._exeStop and self._valStart:
411 return calcWallTime(self._exeStop, self._valStart)
412 else:
413 return None
414
415 @property
417 if self._valStart and self._valStop:
418 return calcCpuTime(self._valStart, self._valStop)
419 else:
420 return None
421
422 @property
424 if self._valStart and self._valStop:
425 return calcWallTime(self._valStart, self._valStop)
426 else:
427 return None
428
429 @property
430 def cpuTimeTotal(self):
431 if self._preExeStart and self._valStop:
432 return calcCpuTime(self._preExeStart, self._valStop)
433 else:
434 return None
435
436 @property
437 def wallTimeTotal(self):
438 if self._preExeStart and self._valStop:
439 return calcWallTime(self._preExeStart, self._valStop)
440 else:
441 return None
442
443 @property
444 def eventCount(self):
445 return self._eventCount
446
447 @property
448 def reSimEvent(self):
449 return self._resimevents
450
451 @property
452 def athenaMP(self):
453 return self._athenaMP
454
455 @property
456 def dbMonitor(self):
457 return self._dbMonitor
458
459
460 # set start times, if not set already
461 def setPreExeStart(self):
462 if self._preExeStart is None:
463 self._preExeStart = os.times()
464 msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465
466 def setValStart(self):
467 if self._valStart is None:
468 self._valStart = os.times()
469 msg.debug('valStart time is {0}'.format(self._valStart))
470
471 def preExecute(self, input = set(), output = set()):
472 self.setPreExeStart()
473 msg.info('Preexecute for %s', self._name)
474
475 def execute(self):
476 self._exeStart = os.times()
477 msg.debug('exeStart time is {0}'.format(self._exeStart))
478 msg.info('Starting execution of %s', self._name)
479 self._hasExecuted = True
480 self._rc = 0
481 self._errMsg = ''
482 msg.info('%s executor returns %d', self._name, self._rc)
483 self._exeStop = os.times()
484 msg.debug('preExeStop time is {0}'.format(self._exeStop))
485
486 def postExecute(self):
487 msg.info('Postexecute for %s', self._name)
488
489 def validate(self):
490 self.setValStart()
491 self._hasValidated = True
492 msg.info('Executor %s has no validation function - assuming all ok', self._name)
493 self._isValidated = True
494 self._errMsg = ''
495 self._valStop = os.times()
496 msg.debug('valStop time is {0}'.format(self._valStop))
497
498
499 def doAll(self, input=set(), output=set()):
500 self.preExecute(input, output)
501 self.execute()
502 self.postExecute()
503 self.validate()
504
505
507 def __init__(self, name = 'Logscan'):
508 super(logscanExecutor, self).__init__(name=name)
509 self._errorMaskFiles = None
510 self._logFileName = None
511
512 def preExecute(self, input = set(), output = set()):
513 self.setPreExeStart()
514 msg.info('Preexecute for %s', self._name)
515 if 'logfile' in self.conf.argdict:
516 self._logFileName = self.conf.argdict['logfile'].value
517
518 def validate(self):
519 self.setValStart()
520 msg.info("Starting validation for {0}".format(self._name))
521 if self._logFileName:
522
524 if 'ignorePatterns' in self.conf.argdict:
525 igPat = self.conf.argdict['ignorePatterns'].value
526 else:
527 igPat = []
528 if 'ignoreFiles' in self.conf.argdict:
529 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
530 elif self._errorMaskFiles is not None:
531 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
532 else:
533 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
534
535 # Now actually scan my logfile
536 msg.info('Scanning logfile {0} for errors'.format(self._logFileName))
537 self._logScan = trfValidation.athenaLogFileReport(logfile = self._logFileName, ignoreList = ignorePatterns)
538 worstError = self._logScan.worstError()
539
540 # In general we add the error message to the exit message, but if it's too long then don't do
541 # that and just say look in the jobReport
542 if worstError['firstError']:
543 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
544 if 'CoreDumpSvc' in worstError['firstError']['message']:
545 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
546 elif 'G4Exception' in worstError['firstError']['message']:
547 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
548 else:
549 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
550 else:
551 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
552 else:
553 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
554
555 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
556 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
557 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
559 self._isValidated = False
560 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
561 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
562 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
563
564 # Must be ok if we got here!
565 msg.info('Executor {0} has validated successfully'.format(self.name))
566 self._isValidated = True
567 self._errMsg = ''
568
569 self._valStop = os.times()
570 msg.debug('valStop time is {0}'.format(self._valStop))
571
572
574 def __init__(self, name = 'Echo', trf = None):
575
576 # We are only changing the default name here
577 super(echoExecutor, self).__init__(name=name, trf=trf)
578
579
580 def execute(self):
581 self._exeStart = os.times()
582 msg.debug('exeStart time is {0}'.format(self._exeStart))
583 msg.info('Starting execution of %s', self._name)
584 msg.info('Transform argument dictionary now follows:')
585 for k, v in self.conf.argdict.items():
586 print("%s = %s" % (k, v))
587 self._hasExecuted = True
588 self._rc = 0
589 self._errMsg = ''
590 msg.info('%s executor returns %d', self._name, self._rc)
591 self._exeStop = os.times()
592 msg.debug('exeStop time is {0}'.format(self._exeStop))
593
594
596 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
597
598 # We are only changing the default name here
599 super(dummyExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
600
601
602 def execute(self):
603 self._exeStart = os.times()
604 msg.debug('exeStart time is {0}'.format(self._exeStart))
605 msg.info('Starting execution of %s', self._name)
606 for type in self._outData:
607 for k, v in self.conf.argdict.items():
608 if type in k:
609 msg.info('Creating dummy output file: {0}'.format(self.conf.argdict[k].value[0]))
610 open(self.conf.argdict[k].value[0], 'a').close()
611 self._hasExecuted = True
612 self._rc = 0
613 self._errMsg = ''
614 msg.info('%s executor returns %d', self._name, self._rc)
615 self._exeStop = os.times()
616 msg.debug('exeStop time is {0}'.format(self._exeStop))
617
618
620 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData = set(),
621 exe = None, exeArgs = None, memMonitor = True):
622 # Name of the script we want to execute
623 self._exe = exe
624
625 # With arguments (currently this means paste in the corresponding _argdict entry)
626 self._exeArgs = exeArgs
627
628 super(scriptExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
629
630 self._extraMetadata.update({'script' : exe})
631
632 # Decide if we echo script output to stdout
633 self._echoOutput = False
634
635 # Can either be written by base class or child
636 self._cmd = None
637
638 self._memMonitor = memMonitor
639
640 @property
641 def exe(self):
642 return self._exe
643
644 @exe.setter
645 def exe(self, value):
646 self._exe = value
647 self._extraMetadata['script'] = value
648
649 @property
650 def exeArgs(self):
651 return self._exeArgs
652
653 @exeArgs.setter
654 def exeArgs(self, value):
655 self._exeArgs = value
656# self._extraMetadata['scriptArgs'] = value
657
658 def preExecute(self, input = set(), output = set()):
659 self.setPreExeStart()
660 msg.debug('scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
661
662 self._input = input
663 self._output = output
664
665
666 if self._cmd is None:
668 msg.info('Will execute script as %s', self._cmd)
669
670 # Define this here to have it for environment detection messages
671 self._logFileName = "log.{0}".format(self._name)
672
673
675 if 'TRF_ECHO' in os.environ:
676 msg.info('TRF_ECHO envvar is set - enabling command echoing to stdout')
677 self._echoOutput = True
678 elif 'TRF_NOECHO' in os.environ:
679 msg.info('TRF_NOECHO envvar is set - disabling command echoing to stdout')
680 self._echoOutput = False
681 # PS1 is for sh, bash; prompt is for tcsh and zsh
682 elif isInteractiveEnv():
683 msg.info('Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
684 self._echoOutput = True
685 elif 'TZHOME' in os.environ:
686 msg.info('Tier-0 environment detected - enabling command echoing to stdout')
687 self._echoOutput = True
688 if self._echoOutput is False:
689 msg.info('Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self._name, self._logFileName))
690
691 # Now setup special loggers for logging execution messages to stdout and file
692 self._echologger = logging.getLogger(self._name)
693 self._echologger.setLevel(logging.INFO)
694 self._echologger.propagate = False
695
696 encargs = {'encoding' : 'utf-8'}
697 self._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
698 self._exeLogFile.setFormatter(logging.Formatter('%(asctime)s %(message)s', datefmt='%H:%M:%S'))
699 self._echologger.addHandler(self._exeLogFile)
700
701 if self._echoOutput:
702 self._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
703 self._echostream.setFormatter(logging.Formatter('%(name)s %(asctime)s %(message)s', datefmt='%H:%M:%S'))
704 self._echologger.addHandler(self._echostream)
705
707 if self._exe:
708 self._cmd = [self.exe, ]
709 else:
710 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711 'No executor set in {0}'.format(self.__class__.__name__))
712 for arg in self.exeArgs:
713 if arg in self.conf.argdict:
714 # If we have a list then add each element to our list, else just str() the argument value
715 # Note if there are arguments which need more complex transformations then
716 # consider introducing a special toExeArg() method.
717 if isinstance(self.conf.argdict[arg].value, list):
718 self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719 else:
720 self._cmd.append(str(self.conf.argdict[arg].value))
721
722
723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735 # if we are already inside a container, then
736 # must map /srv of the nested container to /srv of the parent container:
737 # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742 # go back to the original working directory immediately after to store prmon in it
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755 # TODO - link mem.full.current to mem.full.SUBSTEP
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 line = p.stdout.readline()
762 if line:
763 self._echologger.info(line.rstrip())
764 # Hoover up remaining buffered output lines
765 for line in p.stdout:
766 self._echologger.info(line.rstrip())
767
768 self._rc = p.returncode
769 msg.info('%s executor returns %d', self._name, self._rc)
770 self._exeStop = os.times()
771 msg.debug('exeStop time is {0}'.format(self._exeStop))
772 except OSError as e:
773 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
774 msg.error(errMsg)
775 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
776 finally:
777 if self._memMonitor:
778 try:
779 mem_proc.send_signal(signal.SIGUSR1)
780 countWait = 0
781 while (not mem_proc.poll()) and countWait < 10:
782 time.sleep(0.1)
783 countWait += 1
784 except OSError:
785 pass
786
787
788 def postExecute(self):
789 if hasattr(self._exeLogFile, 'close'):
790 self._exeLogFile.close()
791 if self._memMonitor:
792 try:
793 memFile = open(self._memSummaryFile)
794 self._memStats = json.load(memFile)
795 except Exception as e:
796 msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e))
797 self._memMonitor = False
798 self._memStats = {}
799
800
801 def validate(self):
802 if self._valStart is None:
803 self._valStart = os.times()
804 msg.debug('valStart time is {0}'.format(self._valStart))
805 self._hasValidated = True
806
807
808 if self._rc == 0:
809 msg.info('Executor {0} validated successfully (return code {1})'.format(self._name, self._rc))
810 self._isValidated = True
811 self._errMsg = ''
812 else:
813 # Want to learn as much as possible from the non-zero code
814 # this is a bit hard in general, although one can do signals.
815 # Probably need to be more specific per exe, i.e., athena non-zero codes
816 self._isValidated = False
817 if self._rc < 0:
818 # Map return codes to what the shell gives (128 + SIGNUM)
819 self._rc = 128 - self._rc
820 if trfExit.codeToSignalname(self._rc) != "":
821 self._errMsg = '{0} got a {1} signal (exit code {2})'.format(self._name, trfExit.codeToSignalname(self._rc), self._rc)
822 else:
823 self._errMsg = 'Non-zero return code from %s (%d)' % (self._name, self._rc)
824 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_FAIL'), self._errMsg)
825
826
828 if 'checkEventCount' in self.conf.argdict and self.conf.argdict['checkEventCount'].returnMyValue(exe=self) is False:
829 msg.info('Event counting for substep {0} is skipped'.format(self.name))
830 else:
831 if 'mpi' in self.conf.argdict and not mpi.mpiShouldValidate():
832 msg.info('MPI mode -- skipping output event count check')
833 else:
834 checkcount=trfValidation.eventMatch(self)
835 checkcount.decide()
836 self._eventCount = checkcount.eventCount
837 msg.info('Event counting for substep {0} passed'.format(self.name))
838
839 self._valStop = os.times()
840 msg.debug('valStop time is {0}'.format(self._valStop))
841
842
843
845 _exitMessageLimit = 200 # Maximum error message length to report in the exitMsg
846 _defaultIgnorePatternFile = ['atlas_error_mask.db']
847
848
885 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
886 inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'],
887 substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {},
888 literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None,
889 manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
890
891 self._substep = forceToAlphaNum(substep)
892 self._inputEventTest = inputEventTest
893 self._tryDropAndReload = tryDropAndReload
894 self._extraRunargs = extraRunargs
895 self._runtimeRunargs = runtimeRunargs
896 self._literalRunargs = literalRunargs
897 self._dataArgs = dataArgs
898 self._errorMaskFiles = errorMaskFiles
899 self._inputDataTypeCountCheck = inputDataTypeCountCheck
900 self._disableMT = disableMT
901 self._disableMP = disableMP
902 self._onlyMP = onlyMP
903 self._onlyMT = onlyMT
904 self._onlyMPWithRunargs = onlyMPWithRunargs
905 self._skeletonCA=skeletonCA
906
907 if perfMonFile:
908 self._perfMonFile = None
909 msg.debug("Resource monitoring from PerfMon is now deprecated")
910
911 # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
912 if isinstance(skeletonFile, str):
913 self._skeleton = [skeletonFile]
914 else:
915 self._skeleton = skeletonFile
916
917 super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918 exeArgs=exeArgs, memMonitor=memMonitor)
919
920 # Add athena specific metadata
921 self._extraMetadata.update({'substep': substep})
922
923 # Setup JO templates
924 if self._skeleton or self._skeletonCA:
925 self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
926 else:
927 self._jobOptionsTemplate = None
928
929 @property
931 return self._inputDataTypeCountCheck
932
933 @inputDataTypeCountCheck.setter
934 def inputDataTypeCountCheck(self, value):
935 self._inputDataTypeCountCheck = value
936
937 @property
938 def substep(self):
939 return self._substep
940
941 @property
942 def disableMP(self):
943 return self._disableMP
944
945 @disableMP.setter
946 def disableMP(self, value):
947 self._disableMP = value
948
949 @property
950 def disableMT(self):
951 return self._disableMT
952
953 @disableMT.setter
954 def disableMT(self, value):
955 self._disableMT = value
956
957 @property
958 def onlyMP(self):
959 return self._onlyMP
960
961 @onlyMP.setter
962 def onlyMP(self, value):
963 self._onlyMP = value
964
965 @property
966 def onlyMT(self):
967 return self._onlyMT
968
969 @onlyMT.setter
970 def onlyMT(self, value):
971 self._onlyMT = value
972
973 def skeletonCA(self):
974 return self._skeletonCA
975
976 def preExecute(self, input = set(), output = set()):
977 self.setPreExeStart()
978 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
979
980 # Check we actually have events to process!
981 inputEvents = 0
982 dt = ""
983 if self._inputDataTypeCountCheck is None:
984 self._inputDataTypeCountCheck = input
985 for dataType in self._inputDataTypeCountCheck:
986 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
987 continue
988
989 thisInputEvents = self.conf.dataDictionary[dataType].nentries
990 if thisInputEvents > inputEvents:
991 inputEvents = thisInputEvents
992 dt = dataType
993
994 # Now take into account skipEvents and maxEvents
995 if ('skipEvents' in self.conf.argdict and
996 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
997 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
998 else:
999 mySkipEvents = 0
1000
1001 if ('maxEvents' in self.conf.argdict and
1002 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1003 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1004 else:
1005 myMaxEvents = -1
1006
1007 # Any events to process...?
1008 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1009 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1010 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1011
1012 try:
1013 # Expected events to process
1014 if (myMaxEvents != -1):
1015 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1016 expectedEvents = myMaxEvents
1017 else:
1018 expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1019 else:
1020 expectedEvents = inputEvents-mySkipEvents
1021 except TypeError:
1022 # catching type error from UNDEFINED inputEvents count
1023 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1024 expectedEvents = 0
1025
1026
1029 OSSetupString = None
1030
1031 # Extract the asetup string
1032 asetupString = None
1033 legacyThreadingRelease = False
1034 if 'asetup' in self.conf.argdict:
1035 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1036 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1037 else:
1038 msg.info('Asetup report: {0}'.format(asetupReport()))
1039
1040 if asetupString is not None:
1041 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1042 currentOS = os.environ['ALRB_USER_PLATFORM']
1043 if legacyOSRelease and "centos7" not in currentOS:
1044 OSSetupString = "centos7"
1045 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1046
1047
1048 # allow overriding the container OS using a flag
1049 if 'runInContainer' in self.conf.argdict:
1050 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1051 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1052 if OSSetupString is not None and asetupString is None:
1053 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1054 '--asetup must be used for the substep which requires --runInContainer')
1055
1056 # Conditional MP based on runtime arguments
1057 if self._onlyMPWithRunargs:
1058 for k in self._onlyMPWithRunargs:
1059 if k in self.conf._argdict:
1060 self._onlyMP = True
1061
1062 # Check the consistency of parallel configuration: CLI flags + evnironment.
1063 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1064 ('ATHENA_CORE_NUMBER' not in os.environ)):
1065 # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1066 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1067 else:
1068 # Try to detect AthenaMT mode, number of threads and number of concurrent events
1069 if not self._disableMT:
1070 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1071
1072 # Try to detect AthenaMP mode and number of workers
1073 if not self._disableMP:
1074 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1075
1076 # Check that we actually support MT
1077 if self._onlyMP and self._athenaMT > 0:
1078 msg.info("This configuration does not support MT, falling back to MP")
1079 if self._athenaMP == 0:
1080 self._athenaMP = self._athenaMT
1081 self._athenaMT = 0
1083
1084 # Check that we actually support MP
1085 if self._onlyMT and self._athenaMP > 0:
1086 msg.info("This configuration does not support MP, using MT")
1087 if self._athenaMT == 0:
1088 self._athenaMT = self._athenaMP
1090 self._athenaMP = 0
1091
1092 # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1093 # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1094 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1095 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1096 self._disableMP = True
1097 self._athenaMP = 0
1098
1099 # Handle executor steps
1100 if self.conf.totalExecutorSteps > 1:
1101 for dataType in output:
1102 if self.conf._dataDictionary[dataType].originalName:
1103 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1104 else:
1105 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1106 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1107 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1108
1109 # And if this is (still) athenaMP, then set some options for workers and output file report
1110 if self._athenaMP > 0:
1111 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1112 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1113 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1114 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1116 else:
1117 self._athenaMPReadEventOrders = False
1118 # Decide on scheduling
1119 if ('athenaMPStrategy' in self.conf.argdict and
1120 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1121 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1122 else:
1123 self._athenaMPStrategy = 'SharedQueue'
1124
1125 # See if we have options for the target output file size
1126 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1127 for dataType in output:
1128 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1129 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1130 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1131 else:
1132 # Use a globbing strategy
1133 matchedViaGlob = False
1134 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1135 if fnmatch(dataType, mtsType):
1136 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1137 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1138 matchedViaGlob = True
1139 break
1140 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1141 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1142 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1143
1144 # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1145 # so that the mother process output file (if it exists) can be used directly
1146 # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1147 for dataType in output:
1148 if self.conf.totalExecutorSteps <= 1:
1149 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1150 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1151 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1152 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1153 else:
1154 self.conf._dataDictionary[dataType].value[0] += "_000"
1155 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1156 else:
1158
1159
1160 if 'mpi' in self.conf.argdict:
1161 msg.info("Running in MPI mode")
1162 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1163
1164
1165 if self._skeleton or self._skeletonCA:
1166 inputFiles = dict()
1167 for dataType in input:
1168 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1169 outputFiles = dict()
1170 for dataType in output:
1171 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1172
1173 # See if we have any 'extra' file arguments
1174 nameForFiles = commonExecutorStepName(self._name)
1175 for dataType, dataArg in self.conf.dataDictionary.items():
1176 if isinstance(dataArg, list) and dataArg:
1177 if self.conf.totalExecutorSteps <= 1:
1178 raise ValueError('Multiple input arguments provided but only running one substep')
1179 if self.conf.totalExecutorSteps != len(dataArg):
1180 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1181
1182 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1183 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1184 else:
1185 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1186 inputFiles[dataArg.subtype] = dataArg
1187
1188 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1189
1190 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1191 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1192 output = outputFiles)
1193
1194
1196 if len(input) > 0:
1197 self._extraMetadata['inputs'] = list(input)
1198 if len(output) > 0:
1199 self._extraMetadata['outputs'] = list(output)
1200
1201
1202 dbrelease = dbsetup = None
1203 if 'DBRelease' in self.conf.argdict:
1204 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1205 if path.islink(dbrelease):
1206 dbrelease = path.realpath(dbrelease)
1207 if dbrelease:
1208 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1209 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1210 if dbdMatch:
1211 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1212 if not os.access(dbrelease, os.R_OK):
1213 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1214 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1215 dbrelease = dbdMatch.group(1)
1216 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1217 else:
1218 # Check if the DBRelease is setup
1219 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1220 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1221 if unpacked:
1222 # Now run the setup.py script to customise the paths to the current location...
1223 setupDBRelease(dbsetup)
1224 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1225 else:
1226 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1227
1228
1230 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1232
1233
1234 super(athenaExecutor, self).preExecute(input, output)
1235
1236 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1237 # This will have asetup and/or DB release setups in it
1238 # Do this last in this preExecute as the _cmd needs to be finalised
1239 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1240 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1241 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1242
1243
1244 def postExecute(self):
1245 super(athenaExecutor, self).postExecute()
1246 # MPI merging
1247 if 'mpi' in self.conf.argdict:
1248 mpi.mergeOutputs()
1249
1250 # Handle executor substeps
1251 if self.conf.totalExecutorSteps > 1:
1252 if self._athenaMP > 0:
1253 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1254 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1255 if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1256 # first loop over datasets for the output
1257 for dataType in self._output:
1258 newValue = []
1259 if self._athenaMP > 0:
1260 # assume the same number of workers all the time
1261 for i in range(self.conf.totalExecutorSteps):
1262 for v in self.conf.dataDictionary[dataType].value:
1263 newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1264 '_{0}{1}_'.format(executorStepSuffix, i)))
1265 else:
1266 self.conf.dataDictionary[dataType].multipleOK = True
1267 # just combine all executors
1268 for i in range(self.conf.totalExecutorSteps):
1269 newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1270 self.conf.dataDictionary[dataType].value = newValue
1271
1272 # do the merging if needed
1273 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1274 self._smartMerge(self.conf.dataDictionary[dataType])
1275
1276 # If this was an athenaMP run then we need to update output files
1277 elif self._athenaMP > 0:
1278 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1279
1280 skipFileChecks=False
1281 if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1282 skipFileChecks=True
1283 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1284 for dataType in self._output:
1285 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1286 self._smartMerge(self.conf.dataDictionary[dataType])
1287
1288 if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1289 self._targzipJiveXML()
1290
1291 # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1292 # This is a bit ugly to have such a specific feature here though
1293 # TODO
1294 # The best is to have a general approach so that user can extract useful info from log
1295 # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1296 # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1297 # and use it to extract required info and do the summation during log scan.
1298 if self._logFileName=='log.ReSim' and self.name=='ReSim':
1299 msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1300 self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1301
1302 # Remove intermediate input/output files of sub-steps
1303 # Delete only files with io="temporay" which are files with pattern "tmp*"
1304 # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1305 # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1306 # Enable if --deleteIntermediateOutputfiles is set
1307 if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1308 inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1309
1310 for k, v in inputDataDictionary.items():
1311 if not v.io == 'temporary':
1312 continue
1313 for filename in v.value:
1314 if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1315 msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1316 # Check if symbolic link and delete also linked file
1317 if (os.path.realpath(filename) != filename):
1318 targetpath = os.path.realpath(filename)
1319 os.unlink(filename)
1320 if (targetpath) and os.access(targetpath, os.R_OK):
1321 os.unlink(targetpath)
1322
1323
1324 def validate(self):
1325 self.setValStart()
1326 self._hasValidated = True
1327 deferredException = None
1328 memLeakThreshold = 5000
1329 _hasMemLeak = False
1330
1331
1332 try:
1333 super(athenaExecutor, self).validate()
1335 # In this case we hold this exception until the logfile has been scanned
1336 msg.error('Validation of return code failed: {0!s}'.format(e))
1337 deferredException = e
1338
1339
1347 msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1348 self._memLeakResult = analytic().getFittedData(self._memFullFile)
1349 if self._memLeakResult:
1350 if self._memLeakResult['slope'] > memLeakThreshold:
1351 _hasMemLeak = True
1352 msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1353 else:
1354 msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1355 else:
1356 msg.info('No memory monitor file to be analysed')
1357
1358 # Logfile scan setup
1359 # Always use ignorePatterns from the command line
1360 # For patterns in files, pefer the command line first, then any special settings for
1361 # this executor, then fallback to the standard default (atlas_error_mask.db)
1362 if 'ignorePatterns' in self.conf.argdict:
1363 igPat = self.conf.argdict['ignorePatterns'].value
1364 else:
1365 igPat = []
1366 if 'ignoreFiles' in self.conf.argdict:
1367 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1368 elif self._errorMaskFiles is not None:
1369 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1370 else:
1371 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1372
1373 # Now actually scan my logfile
1374 msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1376 ignoreList=ignorePatterns)
1377 worstError = self._logScan.worstError()
1378 eventLoopWarnings = self._logScan.eventLoopWarnings()
1380
1381
1382 # In general we add the error message to the exit message, but if it's too long then don't do
1383 # that and just say look in the jobReport
1384 if worstError['firstError']:
1385 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1386 if 'CoreDumpSvc' in worstError['firstError']['message']:
1387 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1388 elif 'G4Exception' in worstError['firstError']['message']:
1389 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1390 else:
1391 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1392 else:
1393 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1394 else:
1395 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1396
1397 # If we failed on the rc, then abort now
1398 if deferredException is not None:
1399 # Add any logfile information we have
1400 if worstError['nLevel'] >= stdLogLevels['ERROR']:
1401 deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1402 # Add the result of memory analysis
1403 if _hasMemLeak:
1404 deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1405 raise deferredException
1406
1407
1408 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1409 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1410 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1411 # Act as if ignoreErrors=True if running in MPI, because we want to be tolerant to the occasional event failure
1412 elif worstError['nLevel'] >= stdLogLevels['ERROR'] and (not mpi.mpiShouldValidate()):
1413 msg.warning(f'Found {worstError["level"]} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1414 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1415 self._isValidated = False
1416 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1417 # Add the result of memory analysis
1418 if _hasMemLeak:
1419 exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1420 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1421 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1422
1423 # Print event loop warnings
1424 if (len(eventLoopWarnings) > 0):
1425 msg.warning('Found WARNINGS in the event loop, as follows:')
1426 for element in eventLoopWarnings:
1427 msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1428
1429 # Must be ok if we got here!
1430 msg.info('Executor {0} has validated successfully'.format(self.name))
1431 self._isValidated = True
1432
1433 self._valStop = os.times()
1434 msg.debug('valStop time is {0}'.format(self._valStop))
1435
1436
1437 def _isCAEnabled(self):
1438 # CA not present
1439 if 'CA' not in self.conf.argdict:
1440 # If there is no legacy skeleton, then we are running with CA
1441 if not self._skeleton:
1442 return True
1443 else:
1444 return False
1445
1446 # CA present but None, all substeps running with CA
1447 if self.conf.argdict['CA'] is None:
1448 return True
1449
1450 # CA enabled for a substep, running with CA
1451 if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1452 return True
1453
1454 return False
1455
1456
1458
1461 if 'athena' in self.conf.argdict:
1462 self._exe = self.conf.argdict['athena'].value
1463 self._cmd = [self._exe]
1464
1465 # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1466 currentSubstep = None
1467 if 'athenaopts' in self.conf.argdict:
1468 currentName = commonExecutorStepName(self.name)
1469 if currentName in self.conf.argdict['athenaopts'].value:
1470 currentSubstep = currentName
1471 if self.substep in self.conf.argdict['athenaopts'].value:
1472 msg.info('Athenaopts found for {0} and {1}, joining options. '
1473 'Consider changing your configuration to use just the name or the alias of the substep.'
1474 .format(currentSubstep, self.substep))
1475 self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1476 del self.conf.argdict['athenaopts'].value[self.substep]
1477 msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1478 elif self.substep in self.conf.argdict['athenaopts'].value:
1479 currentSubstep = self.substep
1480 elif 'all' in self.conf.argdict['athenaopts'].value:
1481 currentSubstep = 'all'
1482
1483 # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1484 preLoadUpdated = dict()
1485 if 'LD_PRELOAD' in self._envUpdate._envdict:
1486 preLoadUpdated[currentSubstep] = False
1487 if 'athenaopts' in self.conf.argdict:
1488 if currentSubstep is not None:
1489 for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1490 # This code is pretty ugly as the athenaopts argument contains
1491 # strings which are really key/value pairs
1492 if athArg.startswith('--preloadlib'):
1493 try:
1494 i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1495 v = athArg.split('=', 1)[1]
1496 msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1497 newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1498 self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1499 except Exception as e:
1500 msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1501 preLoadUpdated[currentSubstep] = True
1502 break
1503 if not preLoadUpdated[currentSubstep]:
1504 msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1505 if 'athenaopts' in self.conf.argdict:
1506 if currentSubstep is not None:
1507 self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1508 else:
1509 self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1510 else:
1511 self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1512
1513 # Now update command line with the options we have (including any changes to preload)
1514 if 'athenaopts' in self.conf.argdict:
1515 if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1516 self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1517 elif currentSubstep in self.conf.argdict['athenaopts'].value:
1518 self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1519
1520 if currentSubstep is None:
1521 currentSubstep = 'all'
1522
1523 if self._tryDropAndReload:
1524 if self._isCAEnabled():
1525 msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1526 elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1527 msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1528 elif 'athenaopts' in self.conf.argdict:
1529 athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1530 # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1531 if currentSubstep in self.conf.argdict['athenaopts'].value:
1532 conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1533 if len(conflictOpts) > 0:
1534 msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1535 else:
1536 msg.info('Appending "--drop-and-reload" to athena options')
1537 self._cmd.append('--drop-and-reload')
1538 else:
1539 msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1540 self._cmd.append('--drop-and-reload')
1541 else:
1542 # This is the 'standard' case - so drop and reload should be ok
1543 msg.info('Appending "--drop-and-reload" to athena options')
1544 self._cmd.append('--drop-and-reload')
1545 else:
1546 msg.info('Skipping test for "--drop-and-reload" in this executor')
1547
1548 if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1549 # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1550 if self._athenaMT > 0 and not self._disableMT:
1551 if not ('athenaopts' in self.conf.argdict and
1552 any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1553 self._cmd.append('--threads=%s' % str(self._athenaMT))
1554
1555 # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1556 if self._athenaMP > 0 and not self._disableMP:
1557 if not ('athenaopts' in self.conf.argdict and
1558 any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1559 self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1560
1561 # Add topoptions
1562 # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1563 if self._skeleton or self._skeletonCA:
1564 self._cmd += self._topOptionsFiles
1565 msg.info('Updated script arguments with topoptions: %s', self._cmd)
1566
1567
1568
1570 self,
1571 asetup = None,
1572 dbsetup = None,
1573 ossetup = None
1574 ):
1575 self._originalCmd = self._cmd
1576 self._asetup = asetup
1577 self._dbsetup = dbsetup
1578 self._containerSetup = ossetup
1579 self._workdir = os.getcwd()
1580 self._alreadyInContainer = self._workdir.startswith("/srv")
1581 self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1582 self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1583
1584 # Create a setupATLAS script
1585 setupATLAS = 'my_setupATLAS.sh'
1586 with open(setupATLAS, 'w') as f:
1587 print("#!/bin/bash", file=f)
1588 print("""
1589if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1590 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1591fi
1592source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1593 , file=f)
1594 os.chmod(setupATLAS, 0o755)
1595
1596 msg.debug(
1597 'Preparing wrapper file {wrapperFileName} with '
1598 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1599 wrapperFileName = self._wrapperFile,
1600 asetupStatus = self._asetup,
1601 dbsetupStatus = self._dbsetup
1602 )
1603 )
1604
1605 container_cmd = None
1606 try:
1607 with open(self._wrapperFile, 'w') as wrapper:
1608 print('#!/bin/sh', file=wrapper)
1609 if self._containerSetup is not None:
1610 container_cmd = [ os.path.abspath(setupATLAS),
1611 "-c",
1612 self._containerSetup,
1613 "--pwd",
1614 self._workdir,
1615 "-s",
1616 os.path.join('.', self._setupFile),
1617 "-r"]
1618 print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1619 print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1620 print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1621 file = wrapper)
1622 print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1623 file=wrapper)
1624 print('echo', file=wrapper)
1625
1626 if asetup:
1627 wfile = wrapper
1628 asetupFile = None
1629 # if the substep is executed within a container, setup athena with a separate script
1630 # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1631 if self._containerSetup is not None:
1632 asetupFile = open(self._setupFile, 'w')
1633 wfile = asetupFile
1634 print(f'source ./{setupATLAS} -q', file=wfile)
1635 print(f'asetup {asetup}', file=wfile)
1636 print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1637 if dbsetup:
1638 dbroot = path.dirname(dbsetup)
1639 dbversion = path.basename(dbroot)
1640 print("# DBRelease setup", file=wrapper)
1641 print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1642 print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1643 print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1644 print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1645 print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1646 print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1647 if self._disableMT:
1648 print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1649 if self._disableMP:
1650 print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1651 if self._envUpdate.len > 0:
1652 for envSetting in self._envUpdate.values:
1653 if not envSetting.startswith('LD_PRELOAD'):
1654 print("export", envSetting, file=wrapper)
1655 # If Valgrind is engaged, a serialised Athena configuration file
1656 # is generated for use with a subsequent run of Athena with
1657 # Valgrind.
1658 if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1659 msg.info('Valgrind engaged')
1660 # Define the file name of the serialised Athena
1661 # configuration.
1662 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1663 name = self._name
1664 )
1665 # Run Athena for generation of its serialised configuration.
1666 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1667 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1668 # Generate a Valgrind command, suppressing or ussing default
1669 # options as requested and extra options as requested.
1670 if 'valgrindDefaultOpts' in self.conf._argdict:
1671 defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1672 else:
1673 defaultOptions = True
1674 if 'valgrindExtraOpts' in self.conf._argdict:
1675 extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1676 else:
1677 extraOptionsList = None
1678 msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1679 msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1680 command = ValgrindCommand(
1681 defaultOptions = defaultOptions,
1682 extraOptionsList = extraOptionsList,
1683 AthenaSerialisedConfigurationFile = \
1684 AthenaSerialisedConfigurationFile
1685 )
1686 msg.debug("Valgrind command: {command}".format(command = command))
1687 print(command, file=wrapper)
1688 # If VTune is engaged, a serialised Athena configuration file
1689 # is generated for use with a subsequent run of Athena with
1690 # VTune.
1691 elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1692 msg.info('VTune engaged')
1693 # Define the file name of the serialised Athena
1694 # configuration.
1695 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1696 name = self._name
1697 )
1698 # Run Athena for generation of its serialised configuration.
1699 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1700 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1701 # Generate a VTune command, suppressing or ussing default
1702 # options as requested and extra options as requested.
1703 if 'vtuneDefaultOpts' in self.conf._argdict:
1704 defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1705 else:
1706 defaultOptions = True
1707 if 'vtuneExtraOpts' in self.conf._argdict:
1708 extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1709 else:
1710 extraOptionsList = None
1711
1712 # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1713 if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1714 AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1715 else:
1716 AthenaCommand = self._cmd
1717 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1718
1719 msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1720 msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1721 command = VTuneCommand(
1722 defaultOptions = defaultOptions,
1723 extraOptionsList = extraOptionsList,
1724 AthenaCommand = AthenaCommand
1725 )
1726 msg.debug("VTune command: {command}".format(command = command))
1727 print(command, file=wrapper)
1728 else:
1729 msg.info('Valgrind/VTune not engaged')
1730 # run Athena command
1731 print(' '.join(self._cmd), file=wrapper)
1732 os.chmod(self._wrapperFile, 0o755)
1733 except OSError as e:
1734 errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1735 fileName = self._wrapperFile,
1736 error = e
1737 )
1738 msg.error(errMsg)
1740 trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1741 errMsg
1742 )
1743 self._cmd = [ path.join('.', self._wrapperFile) ]
1744 if self._containerSetup is not None:
1745 asetupFile.close()
1746 self._cmd = container_cmd + self._cmd
1747
1748
1749
1751 def _smartMerge(self, fileArg):
1752
1753 if 'selfMerge' not in dir(fileArg):
1754 msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1755 return
1756
1757 if fileArg.mergeTargetSize == 0:
1758 msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1759 return
1760
1761
1762 mergeCandidates = [list()]
1763 currentMergeSize = 0
1764 for fname in fileArg.value:
1765 size = fileArg.getSingleMetadata(fname, 'file_size')
1766 if not isinstance(size, int):
1767 msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1768 return
1769 # if there is no file in the job, then we must add it
1770 if len(mergeCandidates[-1]) == 0:
1771 msg.debug('Adding file {0} to current empty merge list'.format(fname))
1772 mergeCandidates[-1].append(fname)
1773 currentMergeSize += size
1774 continue
1775 # see if adding this file gets us closer to the target size (but always add if target size is negative)
1776 if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1777 msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1778 mergeCandidates[-1].append(fname)
1779 currentMergeSize += size
1780 continue
1781 # close this merge list and start a new one
1782 msg.debug('Starting a new merge list with file {0}'.format(fname))
1783 mergeCandidates.append([fname])
1784 currentMergeSize = size
1785
1786 msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1787
1788 if len(mergeCandidates) == 1:
1789 # Merging to a single file, so use the original filename that the transform
1790 # was started with
1791 mergeNames = [fileArg.originalName]
1792 else:
1793 # Multiple merge targets, so we need a set of unique names
1794 counter = 0
1795 mergeNames = []
1796 for mergeGroup in mergeCandidates:
1797 # Note that the individual worker files get numbered with 3 digit padding,
1798 # so these non-padded merges should be fine
1799 mergeName = fileArg.originalName + '_{0}'.format(counter)
1800 while path.exists(mergeName):
1801 counter += 1
1802 mergeName = fileArg.originalName + '_{0}'.format(counter)
1803 mergeNames.append(mergeName)
1804 counter += 1
1805 # Now actually do the merges
1806 for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1807 msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1808 if len(mergeGroup) <= 1:
1809 msg.info('Skip merging for single file')
1810 else:
1811
1812 self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1813
1814
1816 #tgzipping JiveXML files
1817 targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1818 if os.path.exists(targetTGZName):
1819 os.remove(targetTGZName)
1820
1821 import tarfile
1822 fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1823
1824 # force gz compression
1825 tar = tarfile.open(targetTGZName, "w:gz")
1826 for fName in os.listdir('.'):
1827 matches = fNameRE.findall(fName)
1828 if len(matches) > 0:
1829 if fNameRE.findall(fName)[0] == fName:
1830 msg.info('adding %s to %s', fName, targetTGZName)
1831 tar.add(fName)
1832
1833 tar.close()
1834 msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1835
1836
1837
1839
1840 # Here we validate, but will suppress any errors
1841 def validate(self):
1842 self.setValStart()
1843 try:
1844 super(optionalAthenaExecutor, self).validate()
1846 # In this case we hold this exception until the logfile has been scanned
1847 msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1848 self._isValidated = False
1849 self._errMsg = e.errMsg
1850 self._rc = e.errCode
1851 self._valStop = os.times()
1852 msg.debug('valStop time is {0}'.format(self._valStop))
1853
1854
1856
1867 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1868 inData = set(), outData = set(), exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True,
1869 perfMonFile = None, tryDropAndReload = True, extraRunargs = {},
1870 manualDataDictionary = None, memMonitor = True):
1871
1872 super(POOLMergeExecutor, self).__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1873 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1874 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1875 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1876 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1877
1878 def preExecute(self, input = set(), output = set()):
1879 self.setPreExeStart()
1880 super(POOLMergeExecutor, self).preExecute(input=input, output=output)
1881
1882
1883 def execute(self):
1884 # First call the parent executor, which will manage the athena execution for us
1885 super(POOLMergeExecutor, self).execute()
1886
1887
1888
1891
1893 def preExecute(self, input=set(), output=set()):
1894 self.setPreExeStart()
1895 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1896 if 'NTUP_PILEUP' not in output:
1897 # New derivation framework transform uses "formats"
1898 if 'formats' not in self.conf.argdict:
1899 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1900 'No derivation configuration specified')
1901
1902 if ('DAOD' not in output) and ('D2AOD' not in output):
1903 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1904 'No base name for DAOD output')
1905
1906 formatList = []
1907 if 'formats' in self.conf.argdict: formatList = self.conf.argdict['formats'].value
1908 for reduction in formatList:
1909 if ('DAOD' in output):
1910 dataType = 'DAOD_' + reduction
1911 if 'augmentations' not in self.conf.argdict:
1912 outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1913 else:
1914 for val in self.conf.argdict['augmentations'].value:
1915 if reduction in val.split(':')[0]:
1916 outputName = 'DAOD_' + val.split(':')[1] + '.' + self.conf.argdict['outputDAODFile'].value[0]
1917 break
1918 else:
1919 outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1920
1921 if ('D2AOD' in output):
1922 dataType = 'D2AOD_' + reduction
1923 outputName = 'D2AOD_' + reduction + '.' + self.conf.argdict['outputD2AODFile'].value[0]
1924
1925 msg.info('Adding reduction output type {0}'.format(dataType))
1926 output.add(dataType)
1927 newReduction = trfArgClasses.argPOOLFile(outputName, io='output', runarg=True, type='AOD',
1928 name=reduction)
1929 # References to _trf - can this be removed?
1930 self.conf.dataDictionary[dataType] = newReduction
1931
1932 # Clean up the stub file from the executor input and the transform's data dictionary
1933 # (we don't remove the actual argFile instance)
1934 if ('DAOD' in output):
1935 output.remove('DAOD')
1936 del self.conf.dataDictionary['DAOD']
1937 del self.conf.argdict['outputDAODFile']
1938 if ('D2AOD' in output):
1939 output.remove('D2AOD')
1940 del self.conf.dataDictionary['D2AOD']
1941 del self.conf.argdict['outputD2AODFile']
1942
1943 msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1944 msg.info('Input/Output: {0}/{1}'.format(input, output))
1945
1946 msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1947 msg.info('Input/Output: {0}/{1}'.format(input, output))
1948 super(reductionFrameworkExecutor, self).preExecute(input, output)
1949
1950
1951
1953 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']),
1954 exe='DQHistogramMerge.py', exeArgs = [], memMonitor = True):
1955
1956 self._histMergeList = 'HISTMergeList.txt'
1957
1958 super(DQMergeExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1959 exeArgs=exeArgs, memMonitor=memMonitor)
1960
1961
1962 def preExecute(self, input = set(), output = set()):
1963 self.setPreExeStart()
1964 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1965
1966 super(DQMergeExecutor, self).preExecute(input=input, output=output)
1967
1968 # Write the list of files to be merged
1969 with open(self._histMergeList, 'w') as DQMergeFile:
1970 for dataType in input:
1971 for fname in self.conf.dataDictionary[dataType].value:
1972 self.conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1973 print(fname, file=DQMergeFile)
1974
1975 self._cmd.append(self._histMergeList)
1976
1977 # Add the output file
1978 if len(output) != 1:
1979 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1980 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
1981 outDataType = list(output)[0]
1982 self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
1983
1984 # Set the run_post_processing to True/False
1985 if (self.conf._argdict.get("run_post_processing",False)):
1986 self._cmd.append('True')
1987 else:
1988 self._cmd.append('False')
1989
1990 if (self.conf._argdict.get("is_incremental_merge",False)):
1991 self._cmd.append('True')
1992 else:
1993 self._cmd.append('False')
1994
1995 for k in ("excludeHist","excludeDir"):
1996 if k in self.conf._argdict:
1997 self._cmd.append("--{0}={1}".format(k,self.conf._argdict[k]))
1998
1999
2000 def validate(self):
2001 self.setValStart()
2002 super(DQMergeExecutor, self).validate()
2003
2004 exitErrorMessage = ''
2005 # Base class validation successful, Now scan the logfile for missed errors.
2006 try:
2008 worstError = logScan.worstError()
2009
2010 # In general we add the error message to the exit message, but if it's too long then don't do
2011 # that and just say look in the jobReport
2012 if worstError['firstError']:
2013 if len(worstError['firstError']['message']) > logScan._msgLimit:
2014 exitErrorMessage = "Long {0} message at line {1}" \
2015 " (see jobReport for further details)".format(worstError['level'],
2016 worstError['firstError']['firstLine'])
2017 else:
2018 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2019 worstError['firstError']['message'])
2020 except OSError as e:
2021 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2023 'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2024
2025 if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2026 'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2027 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2028
2029 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2030 self._isValidated = False
2031 msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2032 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2033 raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2034
2035 # Must be ok if we got here!
2036 msg.info('Executor {0} has validated successfully'.format(self.name))
2037 self._isValidated = True
2038
2039 self._valStop = os.times()
2040 msg.debug('valStop time is {0}'.format(self._valStop))
2041
2042
2044 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']),
2045 exe='DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor = True):
2046
2047 self._histMergeList = 'HISTMergeList.txt'
2048
2049 super(DQMPostProcessExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2050 exeArgs=exeArgs, memMonitor=memMonitor)
2051
2052
2053 def preExecute(self, input = set(), output = set()):
2054 self.setPreExeStart()
2055 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2056
2057 super(DQMPostProcessExecutor, self).preExecute(input=input, output=output)
2058
2059 #build input file list (typically only one):
2060 dsName=self.conf.argdict["inputHISTFile"].dataset
2061 inputList=[]
2062 for dataType in input:
2063 for fname in self.conf.dataDictionary[dataType].value:
2064 #if no dataset name is give, guess it from file name
2065 if not dsName: dsName=".".join(fname.split('.')[0:4])
2066 inputList.append("#".join([dsName,fname]))
2067
2068
2069 if len(output) != 1:
2070 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2071 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2072 outDataType = list(output)[0]
2073 #build argument json:
2074 #T0 uses keys: 'allowCOOLUpload', 'doWebDisplay', 'incrementalMode', 'inputHistFiles', 'mergeParams', 'outputHistFile', 'postProcessing', 'reverseFileOrder', 'servers'
2075 #more keys: runNumber, streamName,projectTag,filepaths,productionMode,skipMerge
2076 wrapperParams={"inputHistFiles" : inputList,
2077 "outputHistFile" : dsName+"#"+self.conf.dataDictionary[outDataType].value[0],
2078 "incrementalMode": "True" if self.conf._argdict.get("is_incremental_merge",False) else "False",
2079 "postProcessing" : "True" if self.conf._argdict.get("run_post_processing",False) else "False",
2080 "doWebDisplay" : "True" if self.conf._argdict.get("doWebDisplay",False) else "False",
2081 "allowCOOLUpload": "True" if self.conf._argdict.get("allowCOOLUpload",False) else "False",
2082 "mergeParams" : "",
2083
2084 }
2085 if "servers" in self.conf._argdict:
2086 wrapperParams["server"]=self.conf._argdict["servers"]
2087
2088 for k in ("excludeHist","excludeDir"):
2089 if k in self.conf._argdict:
2090 wrapperParams["mergeParams"]+=(" --{0}={1}".format(k,self.conf._argdict[k]))
2091
2092
2093 with open("args.json", "w") as f:
2094 json.dump(wrapperParams, f)
2095
2096 self._cmd.append("--argJSON=args.json")
2097
2098
2099
2100 def validate(self):
2101 self.setValStart()
2102 super(DQMPostProcessExecutor, self).validate()
2103
2104 exitErrorMessage = ''
2105 # Base class validation successful, Now scan the logfile for missed errors.
2106 try:
2108 worstError = logScan.worstError()
2109
2110 # In general we add the error message to the exit message, but if it's too long then don't do
2111 # that and just say look in the jobReport
2112 if worstError['firstError']:
2113 if len(worstError['firstError']['message']) > logScan._msgLimit:
2114 exitErrorMessage = "Long {0} message at line {1}" \
2115 " (see jobReport for further details)".format(worstError['level'],
2116 worstError['firstError']['firstLine'])
2117 else:
2118 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2119 worstError['firstError']['message'])
2120 except OSError as e:
2121 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2123 'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2124
2125 if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2126 'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2127 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2128
2129 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2130 self._isValidated = False
2131 msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2132 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2133 raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2134
2135 # Must be ok if we got here!
2136 msg.info('Executor {0} has validated successfully'.format(self.name))
2137 self._isValidated = True
2138
2139 self._valStop = os.times()
2140 msg.debug('valStop time is {0}'.format(self._valStop))
2141
2143
2144 def preExecute(self, input = set(), output = set()):
2145 self.setPreExeStart()
2146 msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2147
2148 # Basic command, and allow overwrite of the output file
2149 if self._exe is None:
2150 self._exe = 'hadd'
2151 self._cmd = [self._exe, "-f"]
2152
2153
2154 # Add the output file
2155 if len(output) != 1:
2156 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2157 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2158 outDataType = list(output)[0]
2159 self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
2160 # Add to be merged to the cmd chain
2161 for dataType in input:
2162 self._cmd.extend(self.conf.dataDictionary[dataType].value)
2163
2164 super(NTUPMergeExecutor, self).preExecute(input=input, output=output)
2165
2166
2168 """Executor for running physvalPostProcessing.py with <input> <output> args"""
2169
2170 def preExecute(self, input = set(), output = set()):
2171 self.setPreExeStart()
2172 msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2173
2174 self._cmd = [self.exe, ]
2175
2176 if len(input) != 1 or len(output) != 1:
2177 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2178 f'Exactly one input and one output must be specified (got inputs={len(input)}, outputs={len(output)})')
2179
2180 self._cmd.append(self.conf.dataDictionary[list(input)[0]].value[0])
2181 self._cmd.append(self.conf.dataDictionary[list(output)[0]].value[0])
2182
2183 # Finalize execution setup by calling the parent class method
2184 super(NtupPhysValPostProcessingExecutor, self).preExecute(input=input, output=output)
2185
2186
2188
2189 def preExecute(self, input = set(), output = set()):
2190 self.setPreExeStart()
2191 self._inputBS = list(input)[0]
2192 self._outputBS = list(output)[0]
2194 self._useStubFile = False
2195 if 'maskEmptyInputs' in self.conf.argdict and self.conf.argdict['maskEmptyInputs'].value is True:
2196 eventfullFiles = []
2197 for fname in self.conf.dataDictionary[self._inputBS].value:
2198 nEvents = self.conf.dataDictionary[self._inputBS].getSingleMetadata(fname, 'nentries')
2199 msg.debug('Found {0} events in file {1}'.format(nEvents, fname))
2200 if isinstance(nEvents, int) and nEvents > 0:
2201 eventfullFiles.append(fname)
2202 self._maskedFiles = list(set(self.conf.dataDictionary[self._inputBS].value) - set(eventfullFiles))
2203 if len(self._maskedFiles) > 0:
2204 msg.info('The following input files are masked because they have 0 events: {0}'.format(' '.join(self._maskedFiles)))
2205 if len(eventfullFiles) == 0:
2206 if 'emptyStubFile' in self.conf.argdict and path.exists(self.conf.argdict['emptyStubFile'].value):
2207 self._useStubFile = True
2208 msg.info("All input files are empty - will use stub file {0} as output".format(self.conf.argdict['emptyStubFile'].value))
2209 else:
2210 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2211 'All input files had zero events - aborting BS merge')
2212
2213 # Write the list of input files to a text file, so that testMergedFiles can swallow it
2214 self._mergeBSFileList = '{0}.list'.format(self._name)
2215 self._mergeBSLogfile = '{0}.out'.format(self._name)
2216 try:
2217 with open(self._mergeBSFileList, 'w') as BSFileList:
2218 for fname in self.conf.dataDictionary[self._inputBS].value:
2219 if fname not in self._maskedFiles:
2220 print(fname, file=BSFileList)
2221 except OSError as e:
2222 errMsg = 'Got an error when writing list of BS files to {0}: {1}'.format(self._mergeBSFileList, e)
2223 msg.error(errMsg)
2224 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'), errMsg)
2225
2226 # Hope that we were given a correct filename...
2227 self._outputFilename = self.conf.dataDictionary[self._outputBS].value[0]
2228 if self._outputFilename.endswith('._0001.data'):
2229 self._doRename = False
2230 self._outputFilename = self._outputFilename.split('._0001.data')[0]
2231 elif self.conf.argdict['allowRename'].value is True:
2232 # OK, non-fatal, we go for a renaming
2233 msg.info('Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2234 self._doRename = True
2235 else:
2236 # No rename allowed, so we are dead...
2237 errmsg = 'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2238 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'), errmsg)
2239
2240 # Set the correct command for execution
2241 self._cmd = [self._exe, self._mergeBSFileList, '0', self._outputFilename]
2242
2243 super(bsMergeExecutor, self).preExecute(input=input, output=output)
2244
2245 def execute(self):
2246 if self._useStubFile:
2247 # Need to fake execution!
2248 self._exeStart = os.times()
2249 msg.debug('exeStart time is {0}'.format(self._exeStart))
2250 msg.info("Using stub file for empty BS output - execution is fake")
2251 if self._outputFilename != self.conf.argdict['emptyStubFile'].value:
2252 os.rename(self.conf.argdict['emptyStubFile'].value, self._outputFilename)
2253 self._memMonitor = False
2254 self._hasExecuted = True
2255 self._rc = 0
2256 self._exeStop = os.times()
2257 msg.debug('exeStop time is {0}'.format(self._exeStop))
2258 else:
2259 super(bsMergeExecutor, self).execute()
2260
2261 def postExecute(self):
2262 if self._useStubFile:
2263 pass
2264 elif self._doRename:
2265 self._expectedOutput = self._outputFilename + '._0001.data'
2266 msg.info('Renaming {0} to {1}'.format(self._expectedOutput, self.conf.dataDictionary[self._outputBS].value[0]))
2267 try:
2268 os.rename(self._outputFilename + '._0001.data', self.conf.dataDictionary[self._outputBS].value[0])
2269 except OSError as e:
2270 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
2271 'Exception raised when renaming {0} to {1}: {2}'.format(self._outputFilename, self.conf.dataDictionary[self._outputBS].value[0], e))
2272 super(bsMergeExecutor, self).postExecute()
2273
2274
2275
2277
2278 def preExecute(self, input = set(), output = set()):
2279 self.setPreExeStart()
2280 self._memMonitor = False
2281
2282 #archiving
2283 if self._exe == 'zip':
2284 if 'outputArchFile' not in self.conf.argdict:
2285 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
2286
2287 self._cmd = ['python']
2288 try:
2289 with open('zip_wrapper.py', 'w') as zip_wrapper:
2290 print("import zipfile, os, shutil", file=zip_wrapper)
2291 if os.path.exists(self.conf.argdict['outputArchFile'].value[0]):
2292 #appending input file(s) to existing archive
2293 print("zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2294 else:
2295 #creating new archive
2296 print("zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2297 print("for f in {}:".format(self.conf.argdict['inputDataFile'].value), file=zip_wrapper)
2298 #This module gives false positives (as of python 3.7.0). Will also check the name for ".zip"
2299 #print >> zip_wrapper, " if zipfile.is_zipfile(f):"
2300 print(" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2301 print(" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2302 print(" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2303 print(" archive.extractall('tmp')", file=zip_wrapper)
2304 print(" archive.close()", file=zip_wrapper)
2305 # remove stuff as soon as it is saved to output in order to save disk space at worker node
2306 print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2307 print(" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2308 print(" os.unlink(f)", file=zip_wrapper)
2309 print(" if os.path.isdir('tmp'):", file=zip_wrapper)
2310 print(" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2311 print(" for name in files:", file=zip_wrapper)
2312 print(" print 'Zipping {}'.format(name)", file=zip_wrapper)
2313 print(" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2314 print(" shutil.rmtree('tmp')", file=zip_wrapper)
2315 print(" else:", file=zip_wrapper)
2316 print(" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2317 print(" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2318 print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2319 print(" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2320 print(" os.unlink(f)", file=zip_wrapper)
2321 print("zf.close()", file=zip_wrapper)
2322 os.chmod('zip_wrapper.py', 0o755)
2323 except OSError as e:
2324 errMsg = 'error writing zip wrapper {fileName}: {error}'.format(fileName = 'zip_wrapper.py',
2325 error = e
2326 )
2327 msg.error(errMsg)
2328 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2329 errMsg
2330 )
2331 self._cmd.append('zip_wrapper.py')
2332
2333 #unarchiving
2334 elif self._exe == 'unarchive':
2335 import zipfile
2336 for infile in self.conf.argdict['inputArchFile'].value:
2337 if not zipfile.is_zipfile(infile):
2338 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2339 'An input file is not a zip archive - aborting unpacking')
2340 self._cmd = ['python']
2341 try:
2342 with open('unarchive_wrapper.py', 'w') as unarchive_wrapper:
2343 print("import zipfile", file=unarchive_wrapper)
2344 print("for f in {}:".format(self.conf.argdict['inputArchFile'].value), file=unarchive_wrapper)
2345 print(" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2346 print(" path = '{}'".format(self.conf.argdict['path']), file=unarchive_wrapper)
2347 print(" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2348 print(" archive.extractall(path)", file=unarchive_wrapper)
2349 print(" archive.close()", file=unarchive_wrapper)
2350 os.chmod('unarchive_wrapper.py', 0o755)
2351 except OSError as e:
2352 errMsg = 'error writing unarchive wrapper {fileName}: {error}'.format(fileName = 'unarchive_wrapper.py',
2353 error = e
2354 )
2355 msg.error(errMsg)
2356 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2357 errMsg
2358 )
2359 self._cmd.append('unarchive_wrapper.py')
2360 super(archiveExecutor, self).preExecute(input=input, output=output)
2361
static Double_t rc
void print(char *figname, TCanvas *c1)
#define min(a, b)
Definition cfImp.cxx:40
Argument class for substep lists, suitable for preExec/postExec.
Class holding the update to an environment that will be passed on to an executor.
Definition trfEnv.py:15
Base class for execution exceptions.
Exception class for validation failures detected by parsing logfiles.
Specialist execution class for DQM post-processing of histograms.
Definition trfExe.py:2043
__init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
Definition trfExe.py:2045
preExecute(self, input=set(), output=set())
Definition trfExe.py:2053
Specialist execution class for merging DQ histograms.
Definition trfExe.py:1952
__init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
Definition trfExe.py:1954
preExecute(self, input=set(), output=set())
Definition trfExe.py:1962
Specialist execution class for merging NTUPLE files.
Definition trfExe.py:2142
preExecute(self, input=set(), output=set())
Definition trfExe.py:2144
Specialist execution class for running post processing on merged PHYVAL NTUPLE file.
Definition trfExe.py:2167
preExecute(self, input=set(), output=set())
Definition trfExe.py:2170
preExecute(self, input=set(), output=set())
Definition trfExe.py:1878
__init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
Definition trfExe.py:1870
Archive transform.
Definition trfExe.py:2276
preExecute(self, input=set(), output=set())
Definition trfExe.py:2278
_writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
Definition trfExe.py:1574
_smartMerge(self, fileArg)
Manage smart merging of output files.
Definition trfExe.py:1751
preExecute(self, input=set(), output=set())
Definition trfExe.py:976
__init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
Definition trfExe.py:889
_prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
Definition trfExe.py:1457
_isCAEnabled(self)
Check if running with CA.
Definition trfExe.py:1437
_skeletonCA
Handle MPI setup.
Definition trfExe.py:905
_tryDropAndReload
Add –drop-and-reload if possible (and allowed!).
Definition trfExe.py:893
_envUpdate
Look for environment updates and perpare the athena command line.
Definition trfExe.py:1229
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
Definition trfExe.py:2187
preExecute(self, input=set(), output=set())
Definition trfExe.py:2189
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Definition trfExe.py:596
__init__(self, name='Echo', trf=None)
Definition trfExe.py:574
setFromTransform(self, trf)
Set configuration properties from the parent transform.
Definition trfExe.py:113
addToArgdict(self, key, value)
Add a new object to the argdict.
Definition trfExe.py:118
addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
Definition trfExe.py:122
__init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Definition trfExe.py:63
Special executor that will enable a logfile scan as part of its validation.
Definition trfExe.py:506
__init__(self, name='Logscan')
Definition trfExe.py:507
preExecute(self, input=set(), output=set())
Definition trfExe.py:512
Athena executor where failure is not consisered fatal.
Definition trfExe.py:1838
Specialist executor to manage the handling of multiple implicit input and output files within the der...
Definition trfExe.py:1890
preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
Definition trfExe.py:1893
preExecute(self, input=set(), output=set())
Definition trfExe.py:658
__init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Definition trfExe.py:621
Executors always only even execute a single step, as seen by the transform.
Definition trfExe.py:127
myMerger(self)
Now define properties for these data members.
Definition trfExe.py:207
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition trfExe.py:138
conf
Executor configuration:
Definition trfExe.py:163
preExecute(self, input=set(), output=set())
Definition trfExe.py:471
doAll(self, input=set(), output=set())
Convenience function.
Definition trfExe.py:499
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Small class used for vailiadating event counts between input and output files.
Class of patterns that can be ignored from athena logfiles.
STL class.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Transform argument class definitions.
Module for transform exit codes.
Contains functions related Athena Job Options files.
Logging configuration for ATLAS job transforms.
Utilities for handling MPI-Athena jobs.
Utilities for handling AthenaMP jobs.
Utilities for handling AthenaMT jobs.
Transform utility functions.
Validation control for job transforms.
Definition index.py:1
_encoding_stream(s)
Definition trfExe.py:45