ATLAS Offline Software
Loading...
Searching...
No Matches
trfExe.py
Go to the documentation of this file.
1# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
2
8
9import json
10import math
11import os
12import os.path as path
13import re
14import signal
15import subprocess
16import sys
17import time
18
19import logging
20from fnmatch import fnmatch
21msg = logging.getLogger(__name__)
22
23from PyJobTransforms.trfJobOptions import JobOptionsTemplate
24from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
28from PyJobTransforms.trfExitCodes import trfExit
29from PyJobTransforms.trfLogger import stdLogLevels
30from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
31from PyJobTransforms.trfMTTools import detectAthenaMTThreads
32
33import PyJobTransforms.trfExceptions as trfExceptions
34import PyJobTransforms.trfValidation as trfValidation
35import PyJobTransforms.trfArgClasses as trfArgClasses
36import PyJobTransforms.trfEnv as trfEnv
38
39
40# Depending on the setting of LANG, sys.stdout may end up with ascii or ansi
41# encoding, rather than utf-8. But Athena uses unicode for some log messages
42# (another example of Gell-Mann's totalitarian principle) and that will result
43# in a crash with python 3. In such a case, force the use of a utf-8 encoded
44# output stream instead.
46 enc = s.encoding.lower()
47 if enc.find('ascii') >= 0 or enc.find('ansi') >= 0:
48 return open (s.fileno(), 'w', encoding='utf-8')
49 return s
50
51
52
57class executorConfig(object):
58
59
63 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
64 self._argdict = argdict
65 self._dataDictionary = dataDictionary
66 self._firstExecutor = firstExecutor
67 self._executorStep = -1
69
70 @property
71 def argdict(self):
72 return self._argdict
73
74 @argdict.setter
75 def argdict(self, value):
76 self._argdict = value
77
78 @property
79 def dataDictionary(self):
80 return self._dataDictionary
81
82 @dataDictionary.setter
83 def dataDictionary(self, value):
84 self._dataDictionary = value
85
86 @property
87 def firstExecutor(self):
88 return self._firstExecutor
89
90 @firstExecutor.setter
91 def firstExecutor(self, value):
92 self._firstExecutor = value
93
94 @property
95 def executorStep(self):
96 return self._executorStep
97
98 @executorStep.setter
99 def executorStep(self, value):
100 self._executorStep = value
101
102 @property
104 return self._totalExecutorSteps
105
106 @totalExecutorSteps.setter
107 def totalExecutorSteps(self, value):
108 self._totalExecutorSteps = value
109
110
113 def setFromTransform(self, trf):
114 self._argdict = trf.argdict
115 self._dataDictionary = trf.dataDictionary
116
117
118 def addToArgdict(self, key, value):
119 self._argdict[key] = value
120
121
122 def addToDataDictionary(self, key, value):
123 self._dataDictionary[key] = value
124
125
126
127class transformExecutor(object):
128
129
138 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
139 # Some information to produce helpful log messages
140
141 self._name = forceToAlphaNum(name)
142 # Data this executor can start from and produce
143 # Note we transform NULL to inNULL and outNULL as a convenience
144 self._inData = set(inData)
145 self._outData = set(outData)
146 if 'NULL' in self._inData:
147 self._inData.remove('NULL')
148 self._inData.add('inNULL')
149 if 'NULL' in self._outData:
150 self._outData.remove('NULL')
151 self._outData.add('outNULL')
152
153 # It's forbidden for an executor to consume and produce the same datatype
154 dataOverlap = self._inData & self._outData
155 if len(dataOverlap) > 0:
156 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_ERROR'),
157 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self._name, ' '.join(dataOverlap)))
158
159
162 if conf is not None:
163 self.conf = conf
164 else:
165 self.conf = executorConfig()
166 if trf is not None:
167 self.conf.setFromTransform(trf)
168
169 # Execution status
170 self._hasExecuted = False
171 self._rc = -1
172 self._errMsg = None
173
174 # Validation status
175 self._hasValidated = False
176 self._isValidated = False
177
178 # Extra metadata
179 # This dictionary holds extra metadata for this executor which will be
180 # provided in job reports
182
183
186 self._preExeStart = None
187 self._exeStart = self._exeStop = None
188 self._valStart = self._valStop = None
189 self._memStats = {}
191 self._memFullFile = None
192 self._eventCount = None
193 self._athenaMP = 0
194 self._athenaMT = 0
196 self._dbMonitor = None
197 self._resimevents = None
199 self._containerSetup = None
200
201 # Holder for execution information about any merges done by this executor in MP mode
202 self._myMerger = []
203
204
205
206 @property
207 def myMerger(self):
208 return self._myMerger
209
210 @property
211 def name(self):
212 return self._name
213
214 @name.setter
215 def name(self, value):
216 self._name = value
217
218 @property
219 def substep(self):
220 if '_substep' in dir(self):
221 return self._substep
222 return None
223
224 @property
225 def trf(self):
226 if '_trf' in dir(self):
227 return self._trf
228 return None
229
230 @trf.setter
231 def trf(self, value):
232 self._trf = value
233
234 @property
235 def inData(self):
236
237 if '_inData' in dir(self):
238 return self._inData
239 return None
240
241 @inData.setter
242 def inData(self, value):
243 self._inData = set(value)
244
245 def inDataUpdate(self, value):
246
247 if '_inData' in dir(self):
248 self._inData.update(value)
249 else:
250
251 self.inData = value
252
253
254 @property
255 def outData(self):
256
257 if '_outData' in dir(self):
258 return self._outData
259 return None
260
261 @outData.setter
262 def outData(self, value):
263 self._outData = set(value)
264
265 def outDataUpdate(self, value):
266
267 if '_outData' in dir(self):
268 self._outData.update(value)
269 else:
270
271 self.outData = value
272
273 @property
274
276 def input(self):
277
278 if '_input' in dir(self):
279 return self._input
280 return None
281
282 @property
283
285 def output(self):
286
287 if '_output' in dir(self):
288 return self._output
289 return None
290
291 @property
292 def extraMetadata(self):
293 return self._extraMetadata
294
295 @property
296 def hasExecuted(self):
297 return self._hasExecuted
298
299 @property
300 def rc(self):
301 return self._rc
302
303 @property
304 def errMsg(self):
305 return self._errMsg
306
307 @property
308 def validation(self):
309 return self._validation
310
311 @validation.setter
312 def validation(self, value):
313 self._validation = value
314
315 @property
316 def hasValidated(self):
317 return self._hasValidated
318
319 @property
320 def isValidated(self):
321 return self._isValidated
322
323
324 @property
325 def first(self):
326 if hasattr(self, '_first'):
327 return self._first
328 else:
329 return None
330
331 @property
333 return self._preExeStart
334
335 @property
336 def exeStartTimes(self):
337 return self._exeStart
338
339 @property
340 def exeStopTimes(self):
341 return self._exeStop
342
343 @property
344 def valStartTimes(self):
345 return self._valStart
346
347 @property
348 def valStopTimes(self):
349 return self._valStop
350
351 @property
352 def preExeCpuTime(self):
353 if self._preExeStart and self._exeStart:
354 return calcCpuTime(self._preExeStart, self._exeStart)
355 else:
356 return None
357
358 @property
359 def preExeWallTime(self):
360 if self._preExeStart and self._exeStart:
361 return calcWallTime(self._preExeStart, self._exeStart)
362 else:
363 return None
364
365 @property
366 def cpuTime(self):
367 if self._exeStart and self._exeStop:
368 return calcCpuTime(self._exeStart, self._exeStop)
369 else:
370 return None
371
372 @property
373 def usrTime(self):
374 if self._exeStart and self._exeStop:
375 return self._exeStop[2] - self._exeStart[2]
376 else:
377 return None
378
379 @property
380 def sysTime(self):
381 if self._exeStart and self._exeStop:
382 return self._exeStop[3] - self._exeStart[3]
383 else:
384 return None
385
386 @property
387 def wallTime(self):
388 if self._exeStart and self._exeStop:
389 return calcWallTime(self._exeStart, self._exeStop)
390 else:
391 return None
392
393 @property
394 def memStats(self):
395 return self._memStats
396
397 @property
398 def memAnalysis(self):
399 return self._memLeakResult
400
401 @property
402 def postExeCpuTime(self):
403 if self._exeStop and self._valStart:
404 return calcCpuTime(self._exeStop, self._valStart)
405 else:
406 return None
407
408 @property
410 if self._exeStop and self._valStart:
411 return calcWallTime(self._exeStop, self._valStart)
412 else:
413 return None
414
415 @property
417 if self._valStart and self._valStop:
418 return calcCpuTime(self._valStart, self._valStop)
419 else:
420 return None
421
422 @property
424 if self._valStart and self._valStop:
425 return calcWallTime(self._valStart, self._valStop)
426 else:
427 return None
428
429 @property
430 def cpuTimeTotal(self):
431 if self._preExeStart and self._valStop:
432 return calcCpuTime(self._preExeStart, self._valStop)
433 else:
434 return None
435
436 @property
437 def wallTimeTotal(self):
438 if self._preExeStart and self._valStop:
439 return calcWallTime(self._preExeStart, self._valStop)
440 else:
441 return None
442
443 @property
444 def eventCount(self):
445 return self._eventCount
446
447 @property
448 def reSimEvent(self):
449 return self._resimevents
450
451 @property
452 def athenaMP(self):
453 return self._athenaMP
454
455 @property
456 def dbMonitor(self):
457 return self._dbMonitor
458
459
460 # set start times, if not set already
461 def setPreExeStart(self):
462 if self._preExeStart is None:
463 self._preExeStart = os.times()
464 msg.debug('preExeStart time is {0}'.format(self._preExeStart))
465
466 def setValStart(self):
467 if self._valStart is None:
468 self._valStart = os.times()
469 msg.debug('valStart time is {0}'.format(self._valStart))
470
471 def preExecute(self, input = set(), output = set()):
472 self.setPreExeStart()
473 msg.info('Preexecute for %s', self._name)
474
475 def execute(self):
476 self._exeStart = os.times()
477 msg.debug('exeStart time is {0}'.format(self._exeStart))
478 msg.info('Starting execution of %s', self._name)
479 self._hasExecuted = True
480 self._rc = 0
481 self._errMsg = ''
482 msg.info('%s executor returns %d', self._name, self._rc)
483 self._exeStop = os.times()
484 msg.debug('preExeStop time is {0}'.format(self._exeStop))
485
486 def postExecute(self):
487 msg.info('Postexecute for %s', self._name)
488
489 def validate(self):
490 self.setValStart()
491 self._hasValidated = True
492 msg.info('Executor %s has no validation function - assuming all ok', self._name)
493 self._isValidated = True
494 self._errMsg = ''
495 self._valStop = os.times()
496 msg.debug('valStop time is {0}'.format(self._valStop))
497
498
499 def doAll(self, input=set(), output=set()):
500 self.preExecute(input, output)
501 self.execute()
502 self.postExecute()
503 self.validate()
504
505
507 def __init__(self, name = 'Logscan'):
508 super(logscanExecutor, self).__init__(name=name)
509 self._errorMaskFiles = None
510 self._logFileName = None
511
512 def preExecute(self, input = set(), output = set()):
513 self.setPreExeStart()
514 msg.info('Preexecute for %s', self._name)
515 if 'logfile' in self.conf.argdict:
516 self._logFileName = self.conf.argdict['logfile'].value
517
518 def validate(self):
519 self.setValStart()
520 msg.info("Starting validation for {0}".format(self._name))
521 if self._logFileName:
522
524 if 'ignorePatterns' in self.conf.argdict:
525 igPat = self.conf.argdict['ignorePatterns'].value
526 else:
527 igPat = []
528 if 'ignoreFiles' in self.conf.argdict:
529 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
530 elif self._errorMaskFiles is not None:
531 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
532 else:
533 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
534
535 # Now actually scan my logfile
536 msg.info('Scanning logfile {0} for errors'.format(self._logFileName))
537 self._logScan = trfValidation.athenaLogFileReport(logfile = self._logFileName, ignoreList = ignorePatterns)
538 worstError = self._logScan.worstError()
539
540 # In general we add the error message to the exit message, but if it's too long then don't do
541 # that and just say look in the jobReport
542 if worstError['firstError']:
543 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
544 if 'CoreDumpSvc' in worstError['firstError']['message']:
545 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
546 elif 'G4Exception' in worstError['firstError']['message']:
547 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
548 else:
549 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
550 else:
551 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
552 else:
553 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
554
555 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
556 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
557 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
559 self._isValidated = False
560 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
561 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
562 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
563
564 # Must be ok if we got here!
565 msg.info('Executor {0} has validated successfully'.format(self.name))
566 self._isValidated = True
567 self._errMsg = ''
568
569 self._valStop = os.times()
570 msg.debug('valStop time is {0}'.format(self._valStop))
571
572
574 def __init__(self, name = 'Echo', trf = None):
575
576 # We are only changing the default name here
577 super(echoExecutor, self).__init__(name=name, trf=trf)
578
579
580 def execute(self):
581 self._exeStart = os.times()
582 msg.debug('exeStart time is {0}'.format(self._exeStart))
583 msg.info('Starting execution of %s', self._name)
584 msg.info('Transform argument dictionary now follows:')
585 for k, v in self.conf.argdict.items():
586 print("%s = %s" % (k, v))
587 self._hasExecuted = True
588 self._rc = 0
589 self._errMsg = ''
590 msg.info('%s executor returns %d', self._name, self._rc)
591 self._exeStop = os.times()
592 msg.debug('exeStop time is {0}'.format(self._exeStop))
593
594
596 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()):
597
598 # We are only changing the default name here
599 super(dummyExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
600
601
602 def execute(self):
603 self._exeStart = os.times()
604 msg.debug('exeStart time is {0}'.format(self._exeStart))
605 msg.info('Starting execution of %s', self._name)
606 for type in self._outData:
607 for k, v in self.conf.argdict.items():
608 if type in k:
609 msg.info('Creating dummy output file: {0}'.format(self.conf.argdict[k].value[0]))
610 open(self.conf.argdict[k].value[0], 'a').close()
611 self._hasExecuted = True
612 self._rc = 0
613 self._errMsg = ''
614 msg.info('%s executor returns %d', self._name, self._rc)
615 self._exeStop = os.times()
616 msg.debug('exeStop time is {0}'.format(self._exeStop))
617
618
620 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData = set(),
621 exe = None, exeArgs = None, memMonitor = True):
622 # Name of the script we want to execute
623 self._exe = exe
624
625 # With arguments (currently this means paste in the corresponding _argdict entry)
626 self._exeArgs = exeArgs
627
628 super(scriptExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
629
630 self._extraMetadata.update({'script' : exe})
631
632 # Decide if we echo script output to stdout
633 self._echoOutput = False
634
635 # Can either be written by base class or child
636 self._cmd = None
637
638 self._memMonitor = memMonitor
639
640 @property
641 def exe(self):
642 return self._exe
643
644 @exe.setter
645 def exe(self, value):
646 self._exe = value
647 self._extraMetadata['script'] = value
648
649 @property
650 def exeArgs(self):
651 return self._exeArgs
652
653 @exeArgs.setter
654 def exeArgs(self, value):
655 self._exeArgs = value
656# self._extraMetadata['scriptArgs'] = value
657
658 def preExecute(self, input = set(), output = set()):
659 self.setPreExeStart()
660 msg.debug('scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
661
662 self._input = input
663 self._output = output
664
665
666 if self._cmd is None:
668 msg.info('Will execute script as %s', self._cmd)
669
670 # Define this here to have it for environment detection messages
671 self._logFileName = "log.{0}".format(self._name)
672
673
675 if 'TRF_ECHO' in os.environ:
676 msg.info('TRF_ECHO envvar is set - enabling command echoing to stdout')
677 self._echoOutput = True
678 elif 'TRF_NOECHO' in os.environ:
679 msg.info('TRF_NOECHO envvar is set - disabling command echoing to stdout')
680 self._echoOutput = False
681 # PS1 is for sh, bash; prompt is for tcsh and zsh
682 elif isInteractiveEnv():
683 msg.info('Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
684 self._echoOutput = True
685 elif 'TZHOME' in os.environ:
686 msg.info('Tier-0 environment detected - enabling command echoing to stdout')
687 self._echoOutput = True
688 if self._echoOutput is False:
689 msg.info('Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self._name, self._logFileName))
690
691 # Now setup special loggers for logging execution messages to stdout and file
692 self._echologger = logging.getLogger(self._name)
693 self._echologger.setLevel(logging.INFO)
694 self._echologger.propagate = False
695
696 encargs = {'encoding' : 'utf-8'}
697 self._exeLogFile = logging.FileHandler(self._logFileName, mode='w', **encargs)
698 self._exeLogFile.setFormatter(logging.Formatter('%(asctime)s %(message)s', datefmt='%H:%M:%S'))
699 self._echologger.addHandler(self._exeLogFile)
700
701 if self._echoOutput:
702 self._echostream = logging.StreamHandler(_encoding_stream(sys.stdout))
703 self._echostream.setFormatter(logging.Formatter('%(name)s %(asctime)s %(message)s', datefmt='%H:%M:%S'))
704 self._echologger.addHandler(self._echostream)
705
707 if self._exe:
708 self._cmd = [self.exe, ]
709 else:
710 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
711 'No executor set in {0}'.format(self.__class__.__name__))
712 for arg in self.exeArgs:
713 if arg in self.conf.argdict:
714 # If we have a list then add each element to our list, else just str() the argument value
715 # Note if there are arguments which need more complex transformations then
716 # consider introducing a special toExeArg() method.
717 if isinstance(self.conf.argdict[arg].value, list):
718 self._cmd.extend([ str(v) for v in self.conf.argdict[arg].value])
719 else:
720 self._cmd.append(str(self.conf.argdict[arg].value))
721
722
723 def execute(self):
724 self._hasExecuted = True
725 msg.info('Starting execution of {0} ({1})'.format(self._name, self._cmd))
726
727 self._exeStart = os.times()
728 msg.debug('exeStart time is {0}'.format(self._exeStart))
729 if ('execOnly' in self.conf.argdict and self.conf.argdict['execOnly'] is True):
730 msg.info('execOnly flag is set - execution will now switch, replacing the transform')
731 os.execvp(self._cmd[0], self._cmd)
732
733 encargs = {'encoding' : 'utf8'}
734 try:
735 # if we are already inside a container, then
736 # must map /srv of the nested container to /srv of the parent container:
737 # https://twiki.atlas-canada.ca/bin/view/AtlasCanada/Containers#Mount_point_summary
738 if self._alreadyInContainer and self._containerSetup is not None:
739 msg.info("chdir /srv to launch a nested container for the substep")
740 os.chdir("/srv")
741 p = subprocess.Popen(self._cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
742 # go back to the original working directory immediately after to store prmon in it
743 if self._alreadyInContainer and self._containerSetup is not None:
744 msg.info("chdir {} after launching the nested container".format(self._workdir))
745 os.chdir(self._workdir)
746
747 if self._memMonitor:
748 try:
749 self._memSummaryFile = 'prmon.summary.' + self._name + '.json'
750 self._memFullFile = 'prmon.full.' + self._name
751 memMonitorCommand = ['prmon', '--pid', str(p.pid), '--filename', 'prmon.full.' + self._name,
752 '--json-summary', self._memSummaryFile, '--log-filename', 'prmon.' + self._name + '.log',
753 '--interval', '30']
754 mem_proc = subprocess.Popen(memMonitorCommand, shell = False, close_fds=True, **encargs)
755 # TODO - link mem.full.current to mem.full.SUBSTEP
756 except Exception as e:
757 msg.warning('Failed to spawn memory monitor for {0}: {1}'.format(self._name, e))
758 self._memMonitor = False
759
760 while p.poll() is None:
761 try:
762 line = p.stdout.readline()
763 if line:
764 self._echologger.info(line.rstrip())
765 except UnicodeDecodeError as e:
766 msg.warning('Exception raised processing athena log: {0}'.format(e))
767 # Hoover up remaining buffered output lines
768 for line in p.stdout:
769 self._echologger.info(line.rstrip())
770
771 self._rc = p.returncode
772 msg.info('%s executor returns %d', self._name, self._rc)
773 self._exeStop = os.times()
774 msg.debug('exeStop time is {0}'.format(self._exeStop))
775 except OSError as e:
776 errMsg = 'Execution of {0} failed and raised OSError: {1}'.format(self._cmd[0], e)
777 msg.error(errMsg)
778 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC'), errMsg)
779 finally:
780 if self._memMonitor:
781 try:
782 mem_proc.send_signal(signal.SIGUSR1)
783 countWait = 0
784 while (not mem_proc.poll()) and countWait < 10:
785 time.sleep(0.1)
786 countWait += 1
787 except OSError:
788 pass
789
790
791 def postExecute(self):
792 if hasattr(self._exeLogFile, 'close'):
793 self._exeLogFile.close()
794 if self._memMonitor:
795 try:
796 memFile = open(self._memSummaryFile)
797 self._memStats = json.load(memFile)
798 except Exception as e:
799 msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e))
800 self._memMonitor = False
801 self._memStats = {}
802
803
804 def validate(self):
805 if self._valStart is None:
806 self._valStart = os.times()
807 msg.debug('valStart time is {0}'.format(self._valStart))
808 self._hasValidated = True
809
810
811 if self._rc == 0:
812 msg.info('Executor {0} validated successfully (return code {1})'.format(self._name, self._rc))
813 self._isValidated = True
814 self._errMsg = ''
815 else:
816 # Want to learn as much as possible from the non-zero code
817 # this is a bit hard in general, although one can do signals.
818 # Probably need to be more specific per exe, i.e., athena non-zero codes
819 self._isValidated = False
820 if self._rc < 0:
821 # Map return codes to what the shell gives (128 + SIGNUM)
822 self._rc = 128 - self._rc
823 if trfExit.codeToSignalname(self._rc) != "":
824 self._errMsg = '{0} got a {1} signal (exit code {2})'.format(self._name, trfExit.codeToSignalname(self._rc), self._rc)
825 else:
826 self._errMsg = 'Non-zero return code from %s (%d)' % (self._name, self._rc)
827 raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_FAIL'), self._errMsg)
828
829
831 if 'checkEventCount' in self.conf.argdict and self.conf.argdict['checkEventCount'].returnMyValue(exe=self) is False:
832 msg.info('Event counting for substep {0} is skipped'.format(self.name))
833 else:
834 if 'mpi' in self.conf.argdict and not mpi.mpiShouldValidate():
835 msg.info('MPI mode -- skipping output event count check')
836 else:
837 checkcount=trfValidation.eventMatch(self)
838 checkcount.decide()
839 self._eventCount = checkcount.eventCount
840 msg.info('Event counting for substep {0} passed'.format(self.name))
841
842 self._valStop = os.times()
843 msg.debug('valStop time is {0}'.format(self._valStop))
844
845
846
848 _exitMessageLimit = 200 # Maximum error message length to report in the exitMsg
849 _defaultIgnorePatternFile = ['atlas_error_mask.db']
850
851
888 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
889 inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'],
890 substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {},
891 literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None,
892 manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False, onlyMP = False, onlyMT = False, onlyMPWithRunargs = None):
893
894 self._substep = forceToAlphaNum(substep)
895 self._inputEventTest = inputEventTest
896 self._tryDropAndReload = tryDropAndReload
897 self._extraRunargs = extraRunargs
898 self._runtimeRunargs = runtimeRunargs
899 self._literalRunargs = literalRunargs
900 self._dataArgs = dataArgs
901 self._errorMaskFiles = errorMaskFiles
902 self._inputDataTypeCountCheck = inputDataTypeCountCheck
903 self._disableMT = disableMT
904 self._disableMP = disableMP
905 self._onlyMP = onlyMP
906 self._onlyMT = onlyMT
907 self._onlyMPWithRunargs = onlyMPWithRunargs
908 self._skeletonCA=skeletonCA
909
910 if perfMonFile:
911 self._perfMonFile = None
912 msg.debug("Resource monitoring from PerfMon is now deprecated")
913
914 # SkeletonFile can be None (disable) or a string or a list of strings - normalise it here
915 if isinstance(skeletonFile, str):
916 self._skeleton = [skeletonFile]
917 else:
918 self._skeleton = skeletonFile
919
920 super(athenaExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
921 exeArgs=exeArgs, memMonitor=memMonitor)
922
923 # Add athena specific metadata
924 self._extraMetadata.update({'substep': substep})
925
926 # Setup JO templates
927 if self._skeleton or self._skeletonCA:
928 self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
929 else:
930 self._jobOptionsTemplate = None
931
932 @property
934 return self._inputDataTypeCountCheck
935
936 @inputDataTypeCountCheck.setter
937 def inputDataTypeCountCheck(self, value):
938 self._inputDataTypeCountCheck = value
939
940 @property
941 def substep(self):
942 return self._substep
943
944 @property
945 def disableMP(self):
946 return self._disableMP
947
948 @disableMP.setter
949 def disableMP(self, value):
950 self._disableMP = value
951
952 @property
953 def disableMT(self):
954 return self._disableMT
955
956 @disableMT.setter
957 def disableMT(self, value):
958 self._disableMT = value
959
960 @property
961 def onlyMP(self):
962 return self._onlyMP
963
964 @onlyMP.setter
965 def onlyMP(self, value):
966 self._onlyMP = value
967
968 @property
969 def onlyMT(self):
970 return self._onlyMT
971
972 @onlyMT.setter
973 def onlyMT(self, value):
974 self._onlyMT = value
975
976 def skeletonCA(self):
977 return self._skeletonCA
978
979 def preExecute(self, input = set(), output = set()):
980 self.setPreExeStart()
981 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
982
983 # Check we actually have events to process!
984 inputEvents = 0
985 dt = ""
986 if self._inputDataTypeCountCheck is None:
987 self._inputDataTypeCountCheck = input
988 for dataType in self._inputDataTypeCountCheck:
989 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
990 continue
991
992 thisInputEvents = self.conf.dataDictionary[dataType].nentries
993 if thisInputEvents > inputEvents:
994 inputEvents = thisInputEvents
995 dt = dataType
996
997 # Now take into account skipEvents and maxEvents
998 if ('skipEvents' in self.conf.argdict and
999 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001 else:
1002 mySkipEvents = 0
1003
1004 if ('maxEvents' in self.conf.argdict and
1005 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1006 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1007 else:
1008 myMaxEvents = -1
1009
1010 # Any events to process...?
1011 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1012 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1013 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1014
1015 try:
1016 # Expected events to process
1017 if (myMaxEvents != -1):
1018 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1019 expectedEvents = myMaxEvents
1020 else:
1021 expectedEvents = min(inputEvents-mySkipEvents, myMaxEvents)
1022 else:
1023 expectedEvents = inputEvents-mySkipEvents
1024 except TypeError:
1025 # catching type error from UNDEFINED inputEvents count
1026 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1027 expectedEvents = 0
1028
1029
1032 OSSetupString = None
1033
1034 # Extract the asetup string
1035 asetupString = None
1036 legacyThreadingRelease = False
1037 if 'asetup' in self.conf.argdict:
1038 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1039 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1040 else:
1041 msg.info('Asetup report: {0}'.format(asetupReport()))
1042
1043 if asetupString is not None:
1044 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1045 currentOS = os.environ['ALRB_USER_PLATFORM']
1046 if legacyOSRelease and "centos7" not in currentOS:
1047 OSSetupString = "centos7"
1048 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1049
1050
1051 # allow overriding the container OS using a flag
1052 if 'runInContainer' in self.conf.argdict:
1053 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1054 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1055 if OSSetupString is not None and asetupString is None:
1056 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1057 '--asetup must be used for the substep which requires --runInContainer')
1058
1059 # Conditional MP based on runtime arguments
1060 if self._onlyMPWithRunargs:
1061 for k in self._onlyMPWithRunargs:
1062 if k in self.conf._argdict:
1063 self._onlyMP = True
1064
1065 # Check the consistency of parallel configuration: CLI flags + evnironment.
1066 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1067 ('ATHENA_CORE_NUMBER' not in os.environ)):
1068 # At least one of the parallel command-line flags has been provided but ATHENA_CORE_NUMBER environment has not been set
1069 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1070 else:
1071 # Try to detect AthenaMT mode, number of threads and number of concurrent events
1072 if not self._disableMT:
1073 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1074
1075 # Try to detect AthenaMP mode and number of workers
1076 if not self._disableMP:
1077 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1078
1079 # Check that we actually support MT
1080 if self._onlyMP and self._athenaMT > 0:
1081 msg.info("This configuration does not support MT, falling back to MP")
1082 if self._athenaMP == 0:
1083 self._athenaMP = self._athenaMT
1084 self._athenaMT = 0
1086
1087 # Check that we actually support MP
1088 if self._onlyMT and self._athenaMP > 0:
1089 msg.info("This configuration does not support MP, using MT")
1090 if self._athenaMT == 0:
1091 self._athenaMT = self._athenaMP
1093 self._athenaMP = 0
1094
1095 # Small hack to detect cases where there are so few events that it's not worthwhile running in MP mode
1096 # which also avoids issues with zero sized files. Distinguish from the no-input case (e.g. evgen)
1097 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1098 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1099 self._disableMP = True
1100 self._athenaMP = 0
1101
1102 # Handle executor steps
1103 if self.conf.totalExecutorSteps > 1:
1104 for dataType in output:
1105 if self.conf._dataDictionary[dataType].originalName:
1106 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1107 else:
1108 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1109 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1110 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1111
1112 # And if this is (still) athenaMP, then set some options for workers and output file report
1113 if self._athenaMP > 0:
1114 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1115 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1116 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1117 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1119 else:
1120 self._athenaMPReadEventOrders = False
1121 # Decide on scheduling
1122 if ('athenaMPStrategy' in self.conf.argdict and
1123 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1124 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1125 else:
1126 self._athenaMPStrategy = 'SharedQueue'
1127
1128 # See if we have options for the target output file size
1129 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1130 for dataType in output:
1131 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1132 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B
1133 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1134 else:
1135 # Use a globbing strategy
1136 matchedViaGlob = False
1137 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1138 if fnmatch(dataType, mtsType):
1139 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B
1140 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1141 matchedViaGlob = True
1142 break
1143 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1144 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B
1145 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1146
1147 # For AthenaMP jobs we ensure that the athena outputs get the suffix _000
1148 # so that the mother process output file (if it exists) can be used directly
1149 # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
1150 for dataType in output:
1151 if self.conf.totalExecutorSteps <= 1:
1152 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1153 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1154 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1155 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1156 else:
1157 self.conf._dataDictionary[dataType].value[0] += "_000"
1158 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1159 else:
1161
1162
1163 if 'mpi' in self.conf.argdict:
1164 msg.info("Running in MPI mode")
1165 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1166
1167
1168 if self._skeleton or self._skeletonCA:
1169 inputFiles = dict()
1170 for dataType in input:
1171 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1172 outputFiles = dict()
1173 for dataType in output:
1174 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1175
1176 # See if we have any 'extra' file arguments
1177 nameForFiles = commonExecutorStepName(self._name)
1178 for dataType, dataArg in self.conf.dataDictionary.items():
1179 if isinstance(dataArg, list) and dataArg:
1180 if self.conf.totalExecutorSteps <= 1:
1181 raise ValueError('Multiple input arguments provided but only running one substep')
1182 if self.conf.totalExecutorSteps != len(dataArg):
1183 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1184
1185 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1186 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1187 else:
1188 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1189 inputFiles[dataArg.subtype] = dataArg
1190
1191 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1192
1193 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
1194 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1195 output = outputFiles)
1196
1197
1199 if len(input) > 0:
1200 self._extraMetadata['inputs'] = list(input)
1201 if len(output) > 0:
1202 self._extraMetadata['outputs'] = list(output)
1203
1204
1205 dbrelease = dbsetup = None
1206 if 'DBRelease' in self.conf.argdict:
1207 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1208 if path.islink(dbrelease):
1209 dbrelease = path.realpath(dbrelease)
1210 if dbrelease:
1211 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
1212 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1213 if dbdMatch:
1214 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1215 if not os.access(dbrelease, os.R_OK):
1216 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1217 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1218 dbrelease = dbdMatch.group(1)
1219 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1220 else:
1221 # Check if the DBRelease is setup
1222 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1223 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1224 if unpacked:
1225 # Now run the setup.py script to customise the paths to the current location...
1226 setupDBRelease(dbsetup)
1227 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
1228 else:
1229 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1230
1231
1233 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1235
1236
1237 super(athenaExecutor, self).preExecute(input, output)
1238
1239 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
1240 # This will have asetup and/or DB release setups in it
1241 # Do this last in this preExecute as the _cmd needs to be finalised
1242 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1243 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1244 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1245
1246
1247 def postExecute(self):
1248 super(athenaExecutor, self).postExecute()
1249 # MPI merging
1250 if 'mpi' in self.conf.argdict:
1251 mpi.mergeOutputs()
1252
1253 # Handle executor substeps
1254 if self.conf.totalExecutorSteps > 1:
1255 if self._athenaMP > 0:
1256 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1257 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
1258 if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
1259 # first loop over datasets for the output
1260 for dataType in self._output:
1261 newValue = []
1262 if self._athenaMP > 0:
1263 # assume the same number of workers all the time
1264 for i in range(self.conf.totalExecutorSteps):
1265 for v in self.conf.dataDictionary[dataType].value:
1266 newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
1267 '_{0}{1}_'.format(executorStepSuffix, i)))
1268 else:
1269 self.conf.dataDictionary[dataType].multipleOK = True
1270 # just combine all executors
1271 for i in range(self.conf.totalExecutorSteps):
1272 newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
1273 self.conf.dataDictionary[dataType].value = newValue
1274
1275 # do the merging if needed
1276 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1277 self._smartMerge(self.conf.dataDictionary[dataType])
1278
1279 # If this was an athenaMP run then we need to update output files
1280 elif self._athenaMP > 0:
1281 outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
1282
1283 skipFileChecks=False
1284 if 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value:
1285 skipFileChecks=True
1286 athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, skipFileChecks, self.conf.argdict)
1287 for dataType in self._output:
1288 if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
1289 self._smartMerge(self.conf.dataDictionary[dataType])
1290
1291 if 'TXT_JIVEXMLTGZ' in self.conf.dataDictionary:
1292 self._targzipJiveXML()
1293
1294 # Summarise events passed the filter ISF_SimEventFilter from log.ReSim
1295 # This is a bit ugly to have such a specific feature here though
1296 # TODO
1297 # The best is to have a general approach so that user can extract useful info from log
1298 # Instead of hard coding a pattern, one idea could be that user provides a regExp pattern
1299 # in which the wanted variable is grouped by a name, then transforms could decode the pattern
1300 # and use it to extract required info and do the summation during log scan.
1301 if self._logFileName=='log.ReSim' and self.name=='ReSim':
1302 msg.info('scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self._logFileName))
1303 self._resimevents = reportEventsPassedSimFilter(self._logFileName)
1304
1305 # Remove intermediate input/output files of sub-steps
1306 # Delete only files with io="temporay" which are files with pattern "tmp*"
1307 # Some stubs like tmp.RDO_TRIG_000 created in AthenaMP mode or
1308 # tmp.HIST_ESD_INT, tmp.HIST_AOD_INT as input to DQHistogramMerge.py are not deleted
1309 # Enable if --deleteIntermediateOutputfiles is set
1310 if ('deleteIntermediateOutputfiles' in self.conf._argdict and self.conf._argdict['deleteIntermediateOutputfiles'].value):
1311 inputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._input ])
1312
1313 for k, v in inputDataDictionary.items():
1314 if not v.io == 'temporary':
1315 continue
1316 for filename in v.value:
1317 if os.access(filename, os.R_OK) and not filename.startswith("/cvmfs"):
1318 msg.info("Removing intermediate {0} input file {1}".format(k, filename))
1319 # Check if symbolic link and delete also linked file
1320 if (os.path.realpath(filename) != filename):
1321 targetpath = os.path.realpath(filename)
1322 os.unlink(filename)
1323 if (targetpath) and os.access(targetpath, os.R_OK):
1324 os.unlink(targetpath)
1325
1326
1327 def validate(self):
1328 self.setValStart()
1329 self._hasValidated = True
1330 deferredException = None
1331 memLeakThreshold = 5000
1332 _hasMemLeak = False
1333
1334
1335 try:
1336 super(athenaExecutor, self).validate()
1338 # In this case we hold this exception until the logfile has been scanned
1339 msg.error('Validation of return code failed: {0!s}'.format(e))
1340 deferredException = e
1341
1342
1350 msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
1351 self._memLeakResult = analytic().getFittedData(self._memFullFile)
1352 if self._memLeakResult:
1353 if self._memLeakResult['slope'] > memLeakThreshold:
1354 _hasMemLeak = True
1355 msg.warning('Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1356 else:
1357 msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
1358 else:
1359 msg.info('No memory monitor file to be analysed')
1360
1361 # Logfile scan setup
1362 # Always use ignorePatterns from the command line
1363 # For patterns in files, pefer the command line first, then any special settings for
1364 # this executor, then fallback to the standard default (atlas_error_mask.db)
1365 if 'ignorePatterns' in self.conf.argdict:
1366 igPat = self.conf.argdict['ignorePatterns'].value
1367 else:
1368 igPat = []
1369 if 'ignoreFiles' in self.conf.argdict:
1370 ignorePatterns = trfValidation.ignorePatterns(files = self.conf.argdict['ignoreFiles'].value, extraSearch=igPat)
1371 elif self._errorMaskFiles is not None:
1372 ignorePatterns = trfValidation.ignorePatterns(files = self._errorMaskFiles, extraSearch=igPat)
1373 else:
1374 ignorePatterns = trfValidation.ignorePatterns(files = athenaExecutor._defaultIgnorePatternFile, extraSearch=igPat)
1375
1376 # Now actually scan my logfile
1377 msg.info('Scanning logfile {0} for errors in substep {1}'.format(self._logFileName, self._substep))
1379 ignoreList=ignorePatterns)
1380 worstError = self._logScan.worstError()
1381 eventLoopWarnings = self._logScan.eventLoopWarnings()
1383
1384
1385 # In general we add the error message to the exit message, but if it's too long then don't do
1386 # that and just say look in the jobReport
1387 if worstError['firstError']:
1388 if len(worstError['firstError']['message']) > athenaExecutor._exitMessageLimit:
1389 if 'CoreDumpSvc' in worstError['firstError']['message']:
1390 exitErrorMessage = "Core dump at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1391 elif 'G4Exception' in worstError['firstError']['message']:
1392 exitErrorMessage = "G4 exception at line {0} (see jobReport for further details)".format(worstError['firstError']['firstLine'])
1393 else:
1394 exitErrorMessage = "Long {0} message at line {1} (see jobReport for further details)".format(worstError['level'], worstError['firstError']['firstLine'])
1395 else:
1396 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName, worstError['firstError']['message'])
1397 else:
1398 exitErrorMessage = "Error level {0} found (see athena logfile for details)".format(worstError['level'])
1399
1400 # If we failed on the rc, then abort now
1401 if deferredException is not None:
1402 # Add any logfile information we have
1403 if worstError['nLevel'] >= stdLogLevels['ERROR']:
1404 deferredException.errMsg = deferredException.errMsg + "; {0}".format(exitErrorMessage)
1405 # Add the result of memory analysis
1406 if _hasMemLeak:
1407 deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1408 raise deferredException
1409
1410
1411 # Very simple: if we get ERROR or worse, we're dead, except if ignoreErrors=True
1412 if worstError['nLevel'] == stdLogLevels['ERROR'] and ('ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
1413 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1414 # Act as if ignoreErrors=True if running in MPI, because we want to be tolerant to the occasional event failure
1415 elif worstError['nLevel'] >= stdLogLevels['ERROR'] and (not mpi.mpiShouldValidate()):
1416 msg.warning(f'Found {worstError["level"]} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1417 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
1418 self._isValidated = False
1419 msg.error('Fatal error in athena logfile (level {0})'.format(worstError['level']))
1420 # Add the result of memory analysis
1421 if _hasMemLeak:
1422 exitErrorMessage = exitErrorMessage + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
1423 raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
1424 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1425
1426 # Print event loop warnings
1427 if (len(eventLoopWarnings) > 0):
1428 msg.warning('Found WARNINGS in the event loop, as follows:')
1429 for element in eventLoopWarnings:
1430 msg.warning('{0} {1} ({2} instances)'.format(element['item']['service'],element['item']['message'],element['count']))
1431
1432 # Must be ok if we got here!
1433 msg.info('Executor {0} has validated successfully'.format(self.name))
1434 self._isValidated = True
1435
1436 self._valStop = os.times()
1437 msg.debug('valStop time is {0}'.format(self._valStop))
1438
1439
1440 def _isCAEnabled(self):
1441 # CA not present
1442 if 'CA' not in self.conf.argdict:
1443 # If there is no legacy skeleton, then we are running with CA
1444 if not self._skeleton:
1445 return True
1446 else:
1447 return False
1448
1449 # CA present but None, all substeps running with CA
1450 if self.conf.argdict['CA'] is None:
1451 return True
1452
1453 # CA enabled for a substep, running with CA
1454 if self.conf.argdict['CA'].returnMyValue(name=self.name, substep=self.substep) is True:
1455 return True
1456
1457 return False
1458
1459
1461
1464 if 'athena' in self.conf.argdict:
1465 self._exe = self.conf.argdict['athena'].value
1466 self._cmd = [self._exe]
1467
1468 # Find options for the current substep. Name is prioritised (e.g. RAWtoALL) over alias (e.g. r2a). Last look for 'all'
1469 currentSubstep = None
1470 if 'athenaopts' in self.conf.argdict:
1471 currentName = commonExecutorStepName(self.name)
1472 if currentName in self.conf.argdict['athenaopts'].value:
1473 currentSubstep = currentName
1474 if self.substep in self.conf.argdict['athenaopts'].value:
1475 msg.info('Athenaopts found for {0} and {1}, joining options. '
1476 'Consider changing your configuration to use just the name or the alias of the substep.'
1477 .format(currentSubstep, self.substep))
1478 self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep])
1479 del self.conf.argdict['athenaopts'].value[self.substep]
1480 msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value))
1481 elif self.substep in self.conf.argdict['athenaopts'].value:
1482 currentSubstep = self.substep
1483 elif 'all' in self.conf.argdict['athenaopts'].value:
1484 currentSubstep = 'all'
1485
1486 # See if there's a preloadlibs and a request to update LD_PRELOAD for athena
1487 preLoadUpdated = dict()
1488 if 'LD_PRELOAD' in self._envUpdate._envdict:
1489 preLoadUpdated[currentSubstep] = False
1490 if 'athenaopts' in self.conf.argdict:
1491 if currentSubstep is not None:
1492 for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]:
1493 # This code is pretty ugly as the athenaopts argument contains
1494 # strings which are really key/value pairs
1495 if athArg.startswith('--preloadlib'):
1496 try:
1497 i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg)
1498 v = athArg.split('=', 1)[1]
1499 msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1500 newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":")))
1501 self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads)
1502 except Exception as e:
1503 msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1504 preLoadUpdated[currentSubstep] = True
1505 break
1506 if not preLoadUpdated[currentSubstep]:
1507 msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name))
1508 if 'athenaopts' in self.conf.argdict:
1509 if currentSubstep is not None:
1510 self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD')))
1511 else:
1512 self.conf.argdict['athenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]
1513 else:
1514 self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))])
1515
1516 # Now update command line with the options we have (including any changes to preload)
1517 if 'athenaopts' in self.conf.argdict:
1518 if currentSubstep is None and "all" in self.conf.argdict['athenaopts'].value:
1519 self._cmd.extend(self.conf.argdict['athenaopts'].value['all'])
1520 elif currentSubstep in self.conf.argdict['athenaopts'].value:
1521 self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep])
1522
1523 if currentSubstep is None:
1524 currentSubstep = 'all'
1525
1526 if self._tryDropAndReload:
1527 if self._isCAEnabled():
1528 msg.info('ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1529 elif 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1530 msg.info('Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1531 elif 'athenaopts' in self.conf.argdict:
1532 athenaConfigRelatedOpts = ['--config-only','--drop-and-reload']
1533 # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string
1534 if currentSubstep in self.conf.argdict['athenaopts'].value:
1535 conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]]))
1536 if len(conflictOpts) > 0:
1537 msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1538 else:
1539 msg.info('Appending "--drop-and-reload" to athena options')
1540 self._cmd.append('--drop-and-reload')
1541 else:
1542 msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name))
1543 self._cmd.append('--drop-and-reload')
1544 else:
1545 # This is the 'standard' case - so drop and reload should be ok
1546 msg.info('Appending "--drop-and-reload" to athena options')
1547 self._cmd.append('--drop-and-reload')
1548 else:
1549 msg.info('Skipping test for "--drop-and-reload" in this executor')
1550
1551 if not self._isCAEnabled(): #For CA-jobs, threads and nproc set in runargs file
1552 # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded
1553 if self._athenaMT > 0 and not self._disableMT:
1554 if not ('athenaopts' in self.conf.argdict and
1555 any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1556 self._cmd.append('--threads=%s' % str(self._athenaMT))
1557
1558 # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess
1559 if self._athenaMP > 0 and not self._disableMP:
1560 if not ('athenaopts' in self.conf.argdict and
1561 any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])):
1562 self._cmd.append('--nprocs=%s' % str(self._athenaMP))
1563
1564 # Add topoptions
1565 # Note that _writeAthenaWrapper removes this from the end of _cmd when preparing the options for VTuneCommand, so assumes it comes last.
1566 if self._skeleton or self._skeletonCA:
1567 self._cmd += self._topOptionsFiles
1568 msg.info('Updated script arguments with topoptions: %s', self._cmd)
1569
1570
1571
1573 self,
1574 asetup = None,
1575 dbsetup = None,
1576 ossetup = None
1577 ):
1578 self._originalCmd = self._cmd
1579 self._asetup = asetup
1580 self._dbsetup = dbsetup
1581 self._containerSetup = ossetup
1582 self._workdir = os.getcwd()
1583 self._alreadyInContainer = self._workdir.startswith("/srv")
1584 self._wrapperFile = 'runwrapper.{name}.sh'.format(name = self._name)
1585 self._setupFile = 'setup.{name}.sh'.format(name = self._name)
1586
1587 # Create a setupATLAS script
1588 setupATLAS = 'my_setupATLAS.sh'
1589 with open(setupATLAS, 'w') as f:
1590 print("#!/bin/bash", file=f)
1591 print("""
1592if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1593 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1594fi
1595source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1596 , file=f)
1597 os.chmod(setupATLAS, 0o755)
1598
1599 msg.debug(
1600 'Preparing wrapper file {wrapperFileName} with '
1601 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1602 wrapperFileName = self._wrapperFile,
1603 asetupStatus = self._asetup,
1604 dbsetupStatus = self._dbsetup
1605 )
1606 )
1607
1608 container_cmd = None
1609 try:
1610 with open(self._wrapperFile, 'w') as wrapper:
1611 print('#!/bin/sh', file=wrapper)
1612 if self._containerSetup is not None:
1613 container_cmd = [ os.path.abspath(setupATLAS),
1614 "-c",
1615 self._containerSetup,
1616 "--pwd",
1617 self._workdir,
1618 "-s",
1619 os.path.join('.', self._setupFile),
1620 "-r"]
1621 print('echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1622 print('echo " '+ " ".join(['setupATLAS'] + container_cmd[1:] + [path.join('.', self._wrapperFile)]) + '"', file=wrapper)
1623 print('echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1624 file = wrapper)
1625 print('echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1626 file=wrapper)
1627 print('echo', file=wrapper)
1628
1629 if asetup:
1630 wfile = wrapper
1631 asetupFile = None
1632 # if the substep is executed within a container, setup athena with a separate script
1633 # e.g. setupATLAS -c el9 -s setup.RDOtoRDOTrigger.sh -r runwrapper.RDOtoRDOTrigger.sh ...
1634 if self._containerSetup is not None:
1635 asetupFile = open(self._setupFile, 'w')
1636 wfile = asetupFile
1637 print(f'source ./{setupATLAS} -q', file=wfile)
1638 print(f'asetup {asetup}', file=wfile)
1639 print('if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1640 if dbsetup:
1641 dbroot = path.dirname(dbsetup)
1642 dbversion = path.basename(dbroot)
1643 print("# DBRelease setup", file=wrapper)
1644 print('echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1645 print('export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1646 print('export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1647 print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper)
1648 print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper)
1649 print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1650 if self._disableMT:
1651 print("# AthenaMT explicitly disabled for this executor", file=wrapper)
1652 if self._disableMP:
1653 print("# AthenaMP explicitly disabled for this executor", file=wrapper)
1654 if self._envUpdate.len > 0:
1655 for envSetting in self._envUpdate.values:
1656 if not envSetting.startswith('LD_PRELOAD'):
1657 print("export", envSetting, file=wrapper)
1658 # If Valgrind is engaged, a serialised Athena configuration file
1659 # is generated for use with a subsequent run of Athena with
1660 # Valgrind.
1661 if 'valgrind' in self.conf._argdict and self.conf._argdict['valgrind'].value is True:
1662 msg.info('Valgrind engaged')
1663 # Define the file name of the serialised Athena
1664 # configuration.
1665 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1666 name = self._name
1667 )
1668 # Run Athena for generation of its serialised configuration.
1669 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1670 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1671 # Generate a Valgrind command, suppressing or ussing default
1672 # options as requested and extra options as requested.
1673 if 'valgrindDefaultOpts' in self.conf._argdict:
1674 defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value
1675 else:
1676 defaultOptions = True
1677 if 'valgrindExtraOpts' in self.conf._argdict:
1678 extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value
1679 else:
1680 extraOptionsList = None
1681 msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions))
1682 msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1683 command = ValgrindCommand(
1684 defaultOptions = defaultOptions,
1685 extraOptionsList = extraOptionsList,
1686 AthenaSerialisedConfigurationFile = \
1687 AthenaSerialisedConfigurationFile
1688 )
1689 msg.debug("Valgrind command: {command}".format(command = command))
1690 print(command, file=wrapper)
1691 # If VTune is engaged, a serialised Athena configuration file
1692 # is generated for use with a subsequent run of Athena with
1693 # VTune.
1694 elif 'vtune' in self.conf._argdict and self.conf._argdict['vtune'].value is True:
1695 msg.info('VTune engaged')
1696 # Define the file name of the serialised Athena
1697 # configuration.
1698 AthenaSerialisedConfigurationFile = "{name}Conf.pkl".format(
1699 name = self._name
1700 )
1701 # Run Athena for generation of its serialised configuration.
1702 print(' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1703 print('if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1704 # Generate a VTune command, suppressing or ussing default
1705 # options as requested and extra options as requested.
1706 if 'vtuneDefaultOpts' in self.conf._argdict:
1707 defaultOptions = self.conf._argdict['vtuneDefaultOpts'].value
1708 else:
1709 defaultOptions = True
1710 if 'vtuneExtraOpts' in self.conf._argdict:
1711 extraOptionsList = self.conf._argdict['vtuneExtraOpts'].value
1712 else:
1713 extraOptionsList = None
1714
1715 # replace the _topOptionsFiles from the Athena command with the AthenaSerialisedConfigurationFile.
1716 if (self._skeleton or self._skeletonCA) and len(self._topOptionsFiles) > 0:
1717 AthenaCommand = self._cmd[:-len(self._topOptionsFiles)]
1718 else:
1719 AthenaCommand = self._cmd
1720 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1721
1722 msg.debug("requested VTune command basic options: {options}".format(options = defaultOptions))
1723 msg.debug("requested VTune command extra options: {options}".format(options = extraOptionsList))
1724 command = VTuneCommand(
1725 defaultOptions = defaultOptions,
1726 extraOptionsList = extraOptionsList,
1727 AthenaCommand = AthenaCommand
1728 )
1729 msg.debug("VTune command: {command}".format(command = command))
1730 print(command, file=wrapper)
1731 else:
1732 msg.info('Valgrind/VTune not engaged')
1733 # run Athena command
1734 print(' '.join(self._cmd), file=wrapper)
1735 os.chmod(self._wrapperFile, 0o755)
1736 except OSError as e:
1737 errMsg = 'error writing athena wrapper {fileName}: {error}'.format(
1738 fileName = self._wrapperFile,
1739 error = e
1740 )
1741 msg.error(errMsg)
1743 trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
1744 errMsg
1745 )
1746 self._cmd = [ path.join('.', self._wrapperFile) ]
1747 if self._containerSetup is not None:
1748 asetupFile.close()
1749 self._cmd = container_cmd + self._cmd
1750
1751
1752
1754 def _smartMerge(self, fileArg):
1755
1756 if 'selfMerge' not in dir(fileArg):
1757 msg.info('Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1758 return
1759
1760 if fileArg.mergeTargetSize == 0:
1761 msg.info('Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1762 return
1763
1764
1765 mergeCandidates = [list()]
1766 currentMergeSize = 0
1767 for fname in fileArg.value:
1768 size = fileArg.getSingleMetadata(fname, 'file_size')
1769 if not isinstance(size, int):
1770 msg.warning('File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg, type(size)))
1771 return
1772 # if there is no file in the job, then we must add it
1773 if len(mergeCandidates[-1]) == 0:
1774 msg.debug('Adding file {0} to current empty merge list'.format(fname))
1775 mergeCandidates[-1].append(fname)
1776 currentMergeSize += size
1777 continue
1778 # see if adding this file gets us closer to the target size (but always add if target size is negative)
1779 if fileArg.mergeTargetSize < 0 or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1780 msg.debug('Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1781 mergeCandidates[-1].append(fname)
1782 currentMergeSize += size
1783 continue
1784 # close this merge list and start a new one
1785 msg.debug('Starting a new merge list with file {0}'.format(fname))
1786 mergeCandidates.append([fname])
1787 currentMergeSize = size
1788
1789 msg.debug('First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1790
1791 if len(mergeCandidates) == 1:
1792 # Merging to a single file, so use the original filename that the transform
1793 # was started with
1794 mergeNames = [fileArg.originalName]
1795 else:
1796 # Multiple merge targets, so we need a set of unique names
1797 counter = 0
1798 mergeNames = []
1799 for mergeGroup in mergeCandidates:
1800 # Note that the individual worker files get numbered with 3 digit padding,
1801 # so these non-padded merges should be fine
1802 mergeName = fileArg.originalName + '_{0}'.format(counter)
1803 while path.exists(mergeName):
1804 counter += 1
1805 mergeName = fileArg.originalName + '_{0}'.format(counter)
1806 mergeNames.append(mergeName)
1807 counter += 1
1808 # Now actually do the merges
1809 for targetName, mergeGroup, counter in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1810 msg.info('Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1811 if len(mergeGroup) <= 1:
1812 msg.info('Skip merging for single file')
1813 else:
1814
1815 self._myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.conf.argdict))
1816
1817
1819 #tgzipping JiveXML files
1820 targetTGZName = self.conf.dataDictionary['TXT_JIVEXMLTGZ'].value[0]
1821 if os.path.exists(targetTGZName):
1822 os.remove(targetTGZName)
1823
1824 import tarfile
1825 fNameRE = re.compile(r"JiveXML\_\d+\_\d+.xml")
1826
1827 # force gz compression
1828 tar = tarfile.open(targetTGZName, "w:gz")
1829 for fName in os.listdir('.'):
1830 matches = fNameRE.findall(fName)
1831 if len(matches) > 0:
1832 if fNameRE.findall(fName)[0] == fName:
1833 msg.info('adding %s to %s', fName, targetTGZName)
1834 tar.add(fName)
1835
1836 tar.close()
1837 msg.info('JiveXML compression: %s has been written and closed.', targetTGZName)
1838
1839
1840
1842
1843 # Here we validate, but will suppress any errors
1844 def validate(self):
1845 self.setValStart()
1846 try:
1847 super(optionalAthenaExecutor, self).validate()
1849 # In this case we hold this exception until the logfile has been scanned
1850 msg.warning('Validation failed for {0}: {1}'.format(self._name, e))
1851 self._isValidated = False
1852 self._errMsg = e.errMsg
1853 self._rc = e.errCode
1854 self._valStop = os.times()
1855 msg.debug('valStop time is {0}'.format(self._valStop))
1856
1857
1859
1870 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1871 inData = set(), outData = set(), exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True,
1872 perfMonFile = None, tryDropAndReload = True, extraRunargs = {},
1873 manualDataDictionary = None, memMonitor = True):
1874
1875 super(POOLMergeExecutor, self).__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1876 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1877 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1878 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1879 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1880
1881 def preExecute(self, input = set(), output = set()):
1882 self.setPreExeStart()
1883 super(POOLMergeExecutor, self).preExecute(input=input, output=output)
1884
1885
1886 def execute(self):
1887 # First call the parent executor, which will manage the athena execution for us
1888 super(POOLMergeExecutor, self).execute()
1889
1890
1891
1894
1896 def preExecute(self, input=set(), output=set()):
1897 self.setPreExeStart()
1898 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1899 if 'NTUP_PILEUP' not in output:
1900 # New derivation framework transform uses "formats"
1901 if 'formats' not in self.conf.argdict:
1902 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1903 'No derivation configuration specified')
1904
1905 if ('DAOD' not in output) and ('D2AOD' not in output):
1906 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_REDUCTION_CONFIG_ERROR'),
1907 'No base name for DAOD output')
1908
1909 formatList = []
1910 if 'formats' in self.conf.argdict: formatList = self.conf.argdict['formats'].value
1911 for reduction in formatList:
1912 if ('DAOD' in output):
1913 dataType = 'DAOD_' + reduction
1914 if 'augmentations' not in self.conf.argdict:
1915 outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1916 else:
1917 for val in self.conf.argdict['augmentations'].value:
1918 if reduction in val.split(':')[0]:
1919 outputName = 'DAOD_' + val.split(':')[1] + '.' + self.conf.argdict['outputDAODFile'].value[0]
1920 break
1921 else:
1922 outputName = 'DAOD_' + reduction + '.' + self.conf.argdict['outputDAODFile'].value[0]
1923
1924 if ('D2AOD' in output):
1925 dataType = 'D2AOD_' + reduction
1926 outputName = 'D2AOD_' + reduction + '.' + self.conf.argdict['outputD2AODFile'].value[0]
1927
1928 msg.info('Adding reduction output type {0}'.format(dataType))
1929 output.add(dataType)
1930 newReduction = trfArgClasses.argPOOLFile(outputName, io='output', runarg=True, type='AOD',
1931 name=reduction)
1932 # References to _trf - can this be removed?
1933 self.conf.dataDictionary[dataType] = newReduction
1934
1935 # Clean up the stub file from the executor input and the transform's data dictionary
1936 # (we don't remove the actual argFile instance)
1937 if ('DAOD' in output):
1938 output.remove('DAOD')
1939 del self.conf.dataDictionary['DAOD']
1940 del self.conf.argdict['outputDAODFile']
1941 if ('D2AOD' in output):
1942 output.remove('D2AOD')
1943 del self.conf.dataDictionary['D2AOD']
1944 del self.conf.argdict['outputD2AODFile']
1945
1946 msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1947 msg.info('Input/Output: {0}/{1}'.format(input, output))
1948
1949 msg.info('Data dictionary is now: {0}'.format(self.conf.dataDictionary))
1950 msg.info('Input/Output: {0}/{1}'.format(input, output))
1951 super(reductionFrameworkExecutor, self).preExecute(input, output)
1952
1953
1954
1956 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']),
1957 exe='DQHistogramMerge.py', exeArgs = [], memMonitor = True):
1958
1959 self._histMergeList = 'HISTMergeList.txt'
1960
1961 super(DQMergeExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1962 exeArgs=exeArgs, memMonitor=memMonitor)
1963
1964
1965 def preExecute(self, input = set(), output = set()):
1966 self.setPreExeStart()
1967 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
1968
1969 super(DQMergeExecutor, self).preExecute(input=input, output=output)
1970
1971 # Write the list of files to be merged
1972 with open(self._histMergeList, 'w') as DQMergeFile:
1973 for dataType in input:
1974 for fname in self.conf.dataDictionary[dataType].value:
1975 self.conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1976 print(fname, file=DQMergeFile)
1977
1978 self._cmd.append(self._histMergeList)
1979
1980 # Add the output file
1981 if len(output) != 1:
1982 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1983 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
1984 outDataType = list(output)[0]
1985 self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
1986
1987 # Set the run_post_processing to True/False
1988 if (self.conf._argdict.get("run_post_processing",False)):
1989 self._cmd.append('True')
1990 else:
1991 self._cmd.append('False')
1992
1993 if (self.conf._argdict.get("is_incremental_merge",False)):
1994 self._cmd.append('True')
1995 else:
1996 self._cmd.append('False')
1997
1998 for k in ("excludeHist","excludeDir"):
1999 if k in self.conf._argdict:
2000 self._cmd.append("--{0}={1}".format(k,self.conf._argdict[k]))
2001
2002
2003 def validate(self):
2004 self.setValStart()
2005 super(DQMergeExecutor, self).validate()
2006
2007 exitErrorMessage = ''
2008 # Base class validation successful, Now scan the logfile for missed errors.
2009 try:
2011 worstError = logScan.worstError()
2012
2013 # In general we add the error message to the exit message, but if it's too long then don't do
2014 # that and just say look in the jobReport
2015 if worstError['firstError']:
2016 if len(worstError['firstError']['message']) > logScan._msgLimit:
2017 exitErrorMessage = "Long {0} message at line {1}" \
2018 " (see jobReport for further details)".format(worstError['level'],
2019 worstError['firstError']['firstLine'])
2020 else:
2021 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2022 worstError['firstError']['message'])
2023 except OSError as e:
2024 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2026 'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2027
2028 if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2029 'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2030 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2031
2032 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2033 self._isValidated = False
2034 msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2035 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2036 raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2037
2038 # Must be ok if we got here!
2039 msg.info('Executor {0} has validated successfully'.format(self.name))
2040 self._isValidated = True
2041
2042 self._valStop = os.times()
2043 msg.debug('valStop time is {0}'.format(self._valStop))
2044
2045
2047 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']),
2048 exe='DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor = True):
2049
2050 self._histMergeList = 'HISTMergeList.txt'
2051
2052 super(DQMPostProcessExecutor, self).__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2053 exeArgs=exeArgs, memMonitor=memMonitor)
2054
2055
2056 def preExecute(self, input = set(), output = set()):
2057 self.setPreExeStart()
2058 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2059
2060 super(DQMPostProcessExecutor, self).preExecute(input=input, output=output)
2061
2062 #build input file list (typically only one):
2063 dsName=self.conf.argdict["inputHISTFile"].dataset
2064 inputList=[]
2065 for dataType in input:
2066 for fname in self.conf.dataDictionary[dataType].value:
2067 #if no dataset name is give, guess it from file name
2068 if not dsName: dsName=".".join(fname.split('.')[0:4])
2069 inputList.append("#".join([dsName,fname]))
2070
2071
2072 if len(output) != 1:
2073 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2074 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2075 outDataType = list(output)[0]
2076 #build argument json:
2077 #T0 uses keys: 'allowCOOLUpload', 'doWebDisplay', 'incrementalMode', 'inputHistFiles', 'mergeParams', 'outputHistFile', 'postProcessing', 'reverseFileOrder', 'servers'
2078 #more keys: runNumber, streamName,projectTag,filepaths,productionMode,skipMerge
2079 wrapperParams={"inputHistFiles" : inputList,
2080 "outputHistFile" : dsName+"#"+self.conf.dataDictionary[outDataType].value[0],
2081 "incrementalMode": "True" if self.conf._argdict.get("is_incremental_merge",False) else "False",
2082 "postProcessing" : "True" if self.conf._argdict.get("run_post_processing",False) else "False",
2083 "doWebDisplay" : "True" if self.conf._argdict.get("doWebDisplay",False) else "False",
2084 "allowCOOLUpload": "True" if self.conf._argdict.get("allowCOOLUpload",False) else "False",
2085 "mergeParams" : "",
2086
2087 }
2088 if "servers" in self.conf._argdict:
2089 wrapperParams["server"]=self.conf._argdict["servers"]
2090
2091 for k in ("excludeHist","excludeDir"):
2092 if k in self.conf._argdict:
2093 wrapperParams["mergeParams"]+=(" --{0}={1}".format(k,self.conf._argdict[k]))
2094
2095
2096 with open("args.json", "w") as f:
2097 json.dump(wrapperParams, f)
2098
2099 self._cmd.append("--argJSON=args.json")
2100
2101
2102
2103 def validate(self):
2104 self.setValStart()
2105 super(DQMPostProcessExecutor, self).validate()
2106
2107 exitErrorMessage = ''
2108 # Base class validation successful, Now scan the logfile for missed errors.
2109 try:
2111 worstError = logScan.worstError()
2112
2113 # In general we add the error message to the exit message, but if it's too long then don't do
2114 # that and just say look in the jobReport
2115 if worstError['firstError']:
2116 if len(worstError['firstError']['message']) > logScan._msgLimit:
2117 exitErrorMessage = "Long {0} message at line {1}" \
2118 " (see jobReport for further details)".format(worstError['level'],
2119 worstError['firstError']['firstLine'])
2120 else:
2121 exitErrorMessage = "Logfile error in {0}: \"{1}\"".format(self._logFileName,
2122 worstError['firstError']['message'])
2123 except OSError as e:
2124 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2126 'Exception raised while attempting to scan logfile {0}: {1}'.format(self._logFileName, e))
2127
2128 if worstError['nLevel'] == stdLogLevels['ERROR'] and (
2129 'ignoreErrors' in self.conf.argdict and self.conf.argdict['ignoreErrors'].value is True):
2130 msg.warning('Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2131
2132 elif worstError['nLevel'] >= stdLogLevels['ERROR']:
2133 self._isValidated = False
2134 msg.error('Fatal error in script logfile (level {0})'.format(worstError['level']))
2135 exitCode = trfExit.nameToCode('TRF_EXEC_LOGERROR')
2136 raise trfExceptions.TransformLogfileErrorException(exitCode, 'Fatal error in script logfile: "{0}"'.format(exitErrorMessage))
2137
2138 # Must be ok if we got here!
2139 msg.info('Executor {0} has validated successfully'.format(self.name))
2140 self._isValidated = True
2141
2142 self._valStop = os.times()
2143 msg.debug('valStop time is {0}'.format(self._valStop))
2144
2146
2147 def preExecute(self, input = set(), output = set()):
2148 self.setPreExeStart()
2149 msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2150
2151 # Basic command, and allow overwrite of the output file
2152 if self._exe is None:
2153 self._exe = 'hadd'
2154 self._cmd = [self._exe, "-f"]
2155
2156
2157 # Add the output file
2158 if len(output) != 1:
2159 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2160 'One (and only one) output file must be given to {0} (got {1})'.format(self.name, len(output)))
2161 outDataType = list(output)[0]
2162 self._cmd.append(self.conf.dataDictionary[outDataType].value[0])
2163 # Add to be merged to the cmd chain
2164 for dataType in input:
2165 self._cmd.extend(self.conf.dataDictionary[dataType].value)
2166
2167 super(NTUPMergeExecutor, self).preExecute(input=input, output=output)
2168
2169
2171 """Executor for running physvalPostProcessing.py with <input> <output> args"""
2172
2173 def preExecute(self, input = set(), output = set()):
2174 self.setPreExeStart()
2175 msg.debug('[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
2176
2177 self._cmd = [self.exe, ]
2178
2179 if len(input) != 1 or len(output) != 1:
2180 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
2181 f'Exactly one input and one output must be specified (got inputs={len(input)}, outputs={len(output)})')
2182
2183 self._cmd.append(self.conf.dataDictionary[list(input)[0]].value[0])
2184 self._cmd.append(self.conf.dataDictionary[list(output)[0]].value[0])
2185
2186 # Finalize execution setup by calling the parent class method
2187 super(NtupPhysValPostProcessingExecutor, self).preExecute(input=input, output=output)
2188
2189
2191
2192 def preExecute(self, input = set(), output = set()):
2193 self.setPreExeStart()
2194 self._inputBS = list(input)[0]
2195 self._outputBS = list(output)[0]
2197 self._useStubFile = False
2198 if 'maskEmptyInputs' in self.conf.argdict and self.conf.argdict['maskEmptyInputs'].value is True:
2199 eventfullFiles = []
2200 for fname in self.conf.dataDictionary[self._inputBS].value:
2201 nEvents = self.conf.dataDictionary[self._inputBS].getSingleMetadata(fname, 'nentries')
2202 msg.debug('Found {0} events in file {1}'.format(nEvents, fname))
2203 if isinstance(nEvents, int) and nEvents > 0:
2204 eventfullFiles.append(fname)
2205 self._maskedFiles = list(set(self.conf.dataDictionary[self._inputBS].value) - set(eventfullFiles))
2206 if len(self._maskedFiles) > 0:
2207 msg.info('The following input files are masked because they have 0 events: {0}'.format(' '.join(self._maskedFiles)))
2208 if len(eventfullFiles) == 0:
2209 if 'emptyStubFile' in self.conf.argdict and path.exists(self.conf.argdict['emptyStubFile'].value):
2210 self._useStubFile = True
2211 msg.info("All input files are empty - will use stub file {0} as output".format(self.conf.argdict['emptyStubFile'].value))
2212 else:
2213 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2214 'All input files had zero events - aborting BS merge')
2215
2216 # Write the list of input files to a text file, so that testMergedFiles can swallow it
2217 self._mergeBSFileList = '{0}.list'.format(self._name)
2218 self._mergeBSLogfile = '{0}.out'.format(self._name)
2219 try:
2220 with open(self._mergeBSFileList, 'w') as BSFileList:
2221 for fname in self.conf.dataDictionary[self._inputBS].value:
2222 if fname not in self._maskedFiles:
2223 print(fname, file=BSFileList)
2224 except OSError as e:
2225 errMsg = 'Got an error when writing list of BS files to {0}: {1}'.format(self._mergeBSFileList, e)
2226 msg.error(errMsg)
2227 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'), errMsg)
2228
2229 # Hope that we were given a correct filename...
2230 self._outputFilename = self.conf.dataDictionary[self._outputBS].value[0]
2231 if self._outputFilename.endswith('._0001.data'):
2232 self._doRename = False
2233 self._outputFilename = self._outputFilename.split('._0001.data')[0]
2234 elif self.conf.argdict['allowRename'].value is True:
2235 # OK, non-fatal, we go for a renaming
2236 msg.info('Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2237 self._doRename = True
2238 else:
2239 # No rename allowed, so we are dead...
2240 errmsg = 'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2241 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'), errmsg)
2242
2243 # Set the correct command for execution
2244 self._cmd = [self._exe, self._mergeBSFileList, '0', self._outputFilename]
2245
2246 super(bsMergeExecutor, self).preExecute(input=input, output=output)
2247
2248 def execute(self):
2249 if self._useStubFile:
2250 # Need to fake execution!
2251 self._exeStart = os.times()
2252 msg.debug('exeStart time is {0}'.format(self._exeStart))
2253 msg.info("Using stub file for empty BS output - execution is fake")
2254 if self._outputFilename != self.conf.argdict['emptyStubFile'].value:
2255 os.rename(self.conf.argdict['emptyStubFile'].value, self._outputFilename)
2256 self._memMonitor = False
2257 self._hasExecuted = True
2258 self._rc = 0
2259 self._exeStop = os.times()
2260 msg.debug('exeStop time is {0}'.format(self._exeStop))
2261 else:
2262 super(bsMergeExecutor, self).execute()
2263
2264 def postExecute(self):
2265 if self._useStubFile:
2266 pass
2267 elif self._doRename:
2268 self._expectedOutput = self._outputFilename + '._0001.data'
2269 msg.info('Renaming {0} to {1}'.format(self._expectedOutput, self.conf.dataDictionary[self._outputBS].value[0]))
2270 try:
2271 os.rename(self._outputFilename + '._0001.data', self.conf.dataDictionary[self._outputBS].value[0])
2272 except OSError as e:
2273 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
2274 'Exception raised when renaming {0} to {1}: {2}'.format(self._outputFilename, self.conf.dataDictionary[self._outputBS].value[0], e))
2275 super(bsMergeExecutor, self).postExecute()
2276
2277
2278
2280
2281 def preExecute(self, input = set(), output = set()):
2282 self.setPreExeStart()
2283 self._memMonitor = False
2284
2285 #archiving
2286 if self._exe == 'zip':
2287 if 'outputArchFile' not in self.conf.argdict:
2288 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_MISSING'), 'Missing output file name')
2289
2290 self._cmd = ['python']
2291 try:
2292 with open('zip_wrapper.py', 'w') as zip_wrapper:
2293 print("import zipfile, os, shutil", file=zip_wrapper)
2294 if os.path.exists(self.conf.argdict['outputArchFile'].value[0]):
2295 #appending input file(s) to existing archive
2296 print("zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2297 else:
2298 #creating new archive
2299 print("zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.conf.argdict['outputArchFile'].value[0]), file=zip_wrapper)
2300 print("for f in {}:".format(self.conf.argdict['inputDataFile'].value), file=zip_wrapper)
2301 #This module gives false positives (as of python 3.7.0). Will also check the name for ".zip"
2302 #print >> zip_wrapper, " if zipfile.is_zipfile(f):"
2303 print(" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2304 print(" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2305 print(" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2306 print(" archive.extractall('tmp')", file=zip_wrapper)
2307 print(" archive.close()", file=zip_wrapper)
2308 # remove stuff as soon as it is saved to output in order to save disk space at worker node
2309 print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2310 print(" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2311 print(" os.unlink(f)", file=zip_wrapper)
2312 print(" if os.path.isdir('tmp'):", file=zip_wrapper)
2313 print(" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2314 print(" for name in files:", file=zip_wrapper)
2315 print(" print 'Zipping {}'.format(name)", file=zip_wrapper)
2316 print(" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2317 print(" shutil.rmtree('tmp')", file=zip_wrapper)
2318 print(" else:", file=zip_wrapper)
2319 print(" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2320 print(" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2321 print(" if os.access(f, os.F_OK):", file=zip_wrapper)
2322 print(" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2323 print(" os.unlink(f)", file=zip_wrapper)
2324 print("zf.close()", file=zip_wrapper)
2325 os.chmod('zip_wrapper.py', 0o755)
2326 except OSError as e:
2327 errMsg = 'error writing zip wrapper {fileName}: {error}'.format(fileName = 'zip_wrapper.py',
2328 error = e
2329 )
2330 msg.error(errMsg)
2331 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2332 errMsg
2333 )
2334 self._cmd.append('zip_wrapper.py')
2335
2336 #unarchiving
2337 elif self._exe == 'unarchive':
2338 import zipfile
2339 for infile in self.conf.argdict['inputArchFile'].value:
2340 if not zipfile.is_zipfile(infile):
2341 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
2342 'An input file is not a zip archive - aborting unpacking')
2343 self._cmd = ['python']
2344 try:
2345 with open('unarchive_wrapper.py', 'w') as unarchive_wrapper:
2346 print("import zipfile", file=unarchive_wrapper)
2347 print("for f in {}:".format(self.conf.argdict['inputArchFile'].value), file=unarchive_wrapper)
2348 print(" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2349 print(" path = '{}'".format(self.conf.argdict['path']), file=unarchive_wrapper)
2350 print(" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2351 print(" archive.extractall(path)", file=unarchive_wrapper)
2352 print(" archive.close()", file=unarchive_wrapper)
2353 os.chmod('unarchive_wrapper.py', 0o755)
2354 except OSError as e:
2355 errMsg = 'error writing unarchive wrapper {fileName}: {error}'.format(fileName = 'unarchive_wrapper.py',
2356 error = e
2357 )
2358 msg.error(errMsg)
2359 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_WRAPPER'),
2360 errMsg
2361 )
2362 self._cmd.append('unarchive_wrapper.py')
2363 super(archiveExecutor, self).preExecute(input=input, output=output)
2364
static Double_t rc
void print(char *figname, TCanvas *c1)
#define min(a, b)
Definition cfImp.cxx:40
Argument class for substep lists, suitable for preExec/postExec.
Class holding the update to an environment that will be passed on to an executor.
Definition trfEnv.py:15
Base class for execution exceptions.
Exception class for validation failures detected by parsing logfiles.
Specialist execution class for DQM post-processing of histograms.
Definition trfExe.py:2046
__init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
Definition trfExe.py:2048
preExecute(self, input=set(), output=set())
Definition trfExe.py:2056
Specialist execution class for merging DQ histograms.
Definition trfExe.py:1955
__init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
Definition trfExe.py:1957
preExecute(self, input=set(), output=set())
Definition trfExe.py:1965
Specialist execution class for merging NTUPLE files.
Definition trfExe.py:2145
preExecute(self, input=set(), output=set())
Definition trfExe.py:2147
Specialist execution class for running post processing on merged PHYVAL NTUPLE file.
Definition trfExe.py:2170
preExecute(self, input=set(), output=set())
Definition trfExe.py:2173
preExecute(self, input=set(), output=set())
Definition trfExe.py:1881
__init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
Definition trfExe.py:1873
Archive transform.
Definition trfExe.py:2279
preExecute(self, input=set(), output=set())
Definition trfExe.py:2281
_writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
Definition trfExe.py:1577
_smartMerge(self, fileArg)
Manage smart merging of output files.
Definition trfExe.py:1754
preExecute(self, input=set(), output=set())
Definition trfExe.py:979
__init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
Definition trfExe.py:892
_prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
Definition trfExe.py:1460
_isCAEnabled(self)
Check if running with CA.
Definition trfExe.py:1440
_skeletonCA
Handle MPI setup.
Definition trfExe.py:908
_tryDropAndReload
Add –drop-and-reload if possible (and allowed!).
Definition trfExe.py:896
_envUpdate
Look for environment updates and perpare the athena command line.
Definition trfExe.py:1232
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
Definition trfExe.py:2190
preExecute(self, input=set(), output=set())
Definition trfExe.py:2192
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Definition trfExe.py:596
__init__(self, name='Echo', trf=None)
Definition trfExe.py:574
setFromTransform(self, trf)
Set configuration properties from the parent transform.
Definition trfExe.py:113
addToArgdict(self, key, value)
Add a new object to the argdict.
Definition trfExe.py:118
addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
Definition trfExe.py:122
__init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Definition trfExe.py:63
Special executor that will enable a logfile scan as part of its validation.
Definition trfExe.py:506
__init__(self, name='Logscan')
Definition trfExe.py:507
preExecute(self, input=set(), output=set())
Definition trfExe.py:512
Athena executor where failure is not consisered fatal.
Definition trfExe.py:1841
Specialist executor to manage the handling of multiple implicit input and output files within the der...
Definition trfExe.py:1893
preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
Definition trfExe.py:1896
preExecute(self, input=set(), output=set())
Definition trfExe.py:658
__init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Definition trfExe.py:621
Executors always only even execute a single step, as seen by the transform.
Definition trfExe.py:127
myMerger(self)
Now define properties for these data members.
Definition trfExe.py:207
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
Base class initaliser for transform executors.
Definition trfExe.py:138
conf
Executor configuration:
Definition trfExe.py:163
preExecute(self, input=set(), output=set())
Definition trfExe.py:471
doAll(self, input=set(), output=set())
Convenience function.
Definition trfExe.py:499
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Small class used for vailiadating event counts between input and output files.
Class of patterns that can be ignored from athena logfiles.
STL class.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179
Transform argument class definitions.
Module for transform exit codes.
Contains functions related Athena Job Options files.
Logging configuration for ATLAS job transforms.
Utilities for handling MPI-Athena jobs.
Utilities for handling AthenaMP jobs.
Utilities for handling AthenaMT jobs.
Transform utility functions.
Validation control for job transforms.
Definition index.py:1
_encoding_stream(s)
Definition trfExe.py:45