20from fnmatch
import fnmatch
21msg = logging.getLogger(__name__)
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27from PyJobTransforms.trfExeStepTools
import commonExecutorStepName, executorStepSuffix
33import PyJobTransforms.trfExceptions
as trfExceptions
36import PyJobTransforms.trfEnv
as trfEnv
46 enc = s.encoding.lower()
47 if enc.find(
'ascii') >= 0
or enc.find(
'ansi') >= 0:
48 return open (s.fileno(),
'w', encoding=
'utf-8')
63 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
82 @dataDictionary.setter
106 @totalExecutorSteps.setter
138 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
155 if len(dataOverlap) > 0:
157 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self.
_name,
' '.join(dataOverlap)))
167 self.
conf.setFromTransform(trf)
220 if '_substep' in dir(self):
226 if '_trf' in dir(self):
237 if '_inData' in dir(self):
247 if '_inData' in dir(self):
257 if '_outData' in dir(self):
267 if '_outData' in dir(self):
278 if '_input' in dir(self):
287 if '_output' in dir(self):
326 if hasattr(self,
'_first'):
464 msg.debug(
'preExeStart time is {0}'.format(self.
_preExeStart))
469 msg.debug(
'valStart time is {0}'.format(self.
_valStart))
473 msg.info(
'Preexecute for %s', self.
_name)
477 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
478 msg.info(
'Starting execution of %s', self.
_name)
482 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
484 msg.debug(
'preExeStop time is {0}'.format(self.
_exeStop))
487 msg.info(
'Postexecute for %s', self.
_name)
492 msg.info(
'Executor %s has no validation function - assuming all ok', self.
_name)
496 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
508 super(logscanExecutor, self).
__init__(name=name)
514 msg.info(
'Preexecute for %s', self.
_name)
515 if 'logfile' in self.
conf.argdict:
520 msg.info(
"Starting validation for {0}".format(self.
_name))
524 if 'ignorePatterns' in self.
conf.argdict:
525 igPat = self.
conf.argdict[
'ignorePatterns'].value
528 if 'ignoreFiles' in self.
conf.argdict:
536 msg.info(
'Scanning logfile {0} for errors'.format(self.
_logFileName))
538 worstError = self.
_logScan.worstError()
542 if worstError[
'firstError']:
543 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
544 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
545 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
546 elif 'G4Exception' in worstError[
'firstError'][
'message']:
547 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
549 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
551 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName, worstError[
'firstError'][
'message'])
553 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".format(worstError[
'level'])
556 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
557 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
560 msg.error(
'Fatal error in athena logfile (level {0})'.format(worstError[
'level']))
562 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
565 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
570 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
577 super(echoExecutor, self).
__init__(name=name, trf=trf)
582 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
583 msg.info(
'Starting execution of %s', self.
_name)
584 msg.info(
'Transform argument dictionary now follows:')
585 for k, v
in self.
conf.argdict.items():
586 print(
"%s = %s" % (k, v))
590 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
592 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
596 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
599 super(dummyExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
604 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
605 msg.info(
'Starting execution of %s', self.
_name)
607 for k, v
in self.
conf.argdict.items():
609 msg.info(
'Creating dummy output file: {0}'.format(self.
conf.argdict[k].value[0]))
610 open(self.
conf.argdict[k].value[0],
'a').close()
614 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
616 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
620 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData =
set(),
621 exe =
None, exeArgs =
None, memMonitor =
True):
628 super(scriptExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
660 msg.debug(
'scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
666 if self.
_cmd is None:
668 msg.info(
'Will execute script as %s', self.
_cmd)
675 if 'TRF_ECHO' in os.environ:
676 msg.info(
'TRF_ECHO envvar is set - enabling command echoing to stdout')
678 elif 'TRF_NOECHO' in os.environ:
679 msg.info(
'TRF_NOECHO envvar is set - disabling command echoing to stdout')
682 elif isInteractiveEnv():
683 msg.info(
'Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
685 elif 'TZHOME' in os.environ:
686 msg.info(
'Tier-0 environment detected - enabling command echoing to stdout')
689 msg.info(
'Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self.
_name, self.
_logFileName))
696 encargs = {
'encoding' :
'utf-8'}
698 self.
_exeLogFile.setFormatter(logging.Formatter(
'%(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
703 self.
_echostream.setFormatter(logging.Formatter(
'%(name)s %(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
711 'No executor set in {0}'.format(self.__class__.__name__))
713 if arg
in self.
conf.argdict:
717 if isinstance(self.
conf.argdict[arg].value, list):
718 self.
_cmd.extend([
str(v)
for v
in self.
conf.argdict[arg].value])
720 self.
_cmd.append(
str(self.
conf.argdict[arg].value))
725 msg.info(
'Starting execution of {0} ({1})'.format(self.
_name, self.
_cmd))
728 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
729 if (
'execOnly' in self.
conf.argdict
and self.
conf.argdict[
'execOnly']
is True):
730 msg.info(
'execOnly flag is set - execution will now switch, replacing the transform')
733 encargs = {
'encoding' :
'utf8'}
739 msg.info(
"chdir /srv to launch a nested container for the substep")
741 p = subprocess.Popen(self.
_cmd, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
744 msg.info(
"chdir {} after launching the nested container".format(self.
_workdir))
751 memMonitorCommand = [
'prmon',
'--pid',
str(p.pid),
'--filename',
'prmon.full.' + self.
_name,
754 mem_proc = subprocess.Popen(memMonitorCommand, shell =
False, close_fds=
True, **encargs)
756 except Exception
as e:
757 msg.warning(
'Failed to spawn memory monitor for {0}: {1}'.format(self.
_name, e))
760 while p.poll()
is None:
761 line = p.stdout.readline()
765 for line
in p.stdout:
769 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
771 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
773 errMsg =
'Execution of {0} failed and raised OSError: {1}'.format(self.
_cmd[0], e)
779 mem_proc.send_signal(signal.SIGUSR1)
781 while (
not mem_proc.poll())
and countWait < 10:
795 except Exception
as e:
796 msg.warning(
'Failed to load JSON memory summmary file {0}: {1}'.format(self.
_memSummaryFile, e))
804 msg.debug(
'valStart time is {0}'.format(self.
_valStart))
809 msg.info(
'Executor {0} validated successfully (return code {1})'.format(self.
_name, self.
_rc))
820 if trfExit.codeToSignalname(self.
_rc) !=
"":
821 self.
_errMsg =
'{0} got a {1} signal (exit code {2})'.format(self.
_name, trfExit.codeToSignalname(self.
_rc), self.
_rc)
823 self.
_errMsg =
'Non-zero return code from %s (%d)' % (self.
_name, self.
_rc)
828 if 'checkEventCount' in self.
conf.argdict
and self.
conf.argdict[
'checkEventCount'].returnMyValue(exe=self)
is False:
829 msg.info(
'Event counting for substep {0} is skipped'.format(self.
name))
831 if 'mpi' in self.
conf.argdict
and not mpi.mpiShouldValidate():
832 msg.info(
'MPI mode -- skipping output event count check')
837 msg.info(
'Event counting for substep {0} passed'.format(self.
name))
840 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
845 _exitMessageLimit = 200
846 _defaultIgnorePatternFile = [
'atlas_error_mask.db']
885 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
886 inData = set(), outData =
set(), inputDataTypeCountCheck =
None, exe =
'athena.py', exeArgs = [
'athenaopts'],
887 substep =
None, inputEventTest =
True, perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {}, runtimeRunargs = {},
888 literalRunargs = [], dataArgs = [], checkEventCount =
False, errorMaskFiles =
None,
889 manualDataDictionary =
None, memMonitor =
True, disableMT =
False, disableMP =
False, onlyMP =
False, onlyMT =
False, onlyMPWithRunargs =
None):
909 msg.debug(
"Resource monitoring from PerfMon is now deprecated")
912 if isinstance(skeletonFile, str):
917 super(athenaExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918 exeArgs=exeArgs, memMonitor=memMonitor)
925 self.
_jobOptionsTemplate = JobOptionsTemplate(exe = self, version =
'$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
933 @inputDataTypeCountCheck.setter
978 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
986 if self.
conf.dataDictionary[dataType].nentries ==
'UNDEFINED':
989 thisInputEvents = self.
conf.dataDictionary[dataType].nentries
990 if thisInputEvents > inputEvents:
991 inputEvents = thisInputEvents
995 if (
'skipEvents' in self.
conf.argdict
and
996 self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
997 mySkipEvents = self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1001 if (
'maxEvents' in self.
conf.argdict
and
1002 self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
1003 myMaxEvents = self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1008 if (self.
_inputEventTest and mySkipEvents > 0
and mySkipEvents >= inputEvents):
1010 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1014 if (myMaxEvents != -1):
1016 expectedEvents = myMaxEvents
1018 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1020 expectedEvents = inputEvents-mySkipEvents
1023 msg.info(
'input event count is UNDEFINED, setting expectedEvents to 0')
1029 OSSetupString =
None
1033 legacyThreadingRelease =
False
1034 if 'asetup' in self.
conf.argdict:
1035 asetupString = self.
conf.argdict[
'asetup'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1036 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1038 msg.info(
'Asetup report: {0}'.format(asetupReport()))
1040 if asetupString
is not None:
1041 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1042 currentOS = os.environ[
'ALRB_USER_PLATFORM']
1043 if legacyOSRelease
and "centos7" not in currentOS:
1044 OSSetupString =
"centos7"
1045 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.format(self.
_substep, OSSetupString))
1049 if 'runInContainer' in self.
conf.argdict:
1050 OSSetupString = self.
conf.argdict[
'runInContainer'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1051 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.format(self.
_substep, OSSetupString))
1052 if OSSetupString
is not None and asetupString
is None:
1054 '--asetup must be used for the substep which requires --runInContainer')
1059 if k
in self.
conf._argdict:
1063 if (((
'multithreaded' in self.
conf._argdict
and self.
conf._argdict[
'multithreaded'].value)
or (
'multiprocess' in self.
conf._argdict
and self.
conf._argdict[
'multiprocess'].value))
and
1064 (
'ATHENA_CORE_NUMBER' not in os.environ)):
1066 msg.warning(
'either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1078 msg.info(
"This configuration does not support MT, falling back to MP")
1086 msg.info(
"This configuration does not support MP, using MT")
1095 msg.info(
"Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self.
_athenaMP))
1100 if self.
conf.totalExecutorSteps > 1:
1101 for dataType
in output:
1102 if self.
conf._dataDictionary[dataType].originalName:
1103 self.
conf._dataDictionary[dataType].value[0] = self.
conf._dataDictionary[dataType].originalName
1105 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1106 self.
conf._dataDictionary[dataType].value[0] +=
"_{0}{1}".format(executorStepSuffix, self.
conf.executorStep)
1107 msg.info(
"Updated athena output filename for {0} to {1}".format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1114 if 'athenaMPUseEventOrders' in self.
conf.argdict
and self.
conf._argdict[
'athenaMPUseEventOrders'].value
is True:
1119 if (
'athenaMPStrategy' in self.
conf.argdict
and
1120 (self.
conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None)):
1126 if 'athenaMPMergeTargetSize' in self.
conf.argdict:
1127 for dataType
in output:
1128 if dataType
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1129 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[dataType] * 1000000
1130 msg.info(
'Set target merge size for {0} to {1}'.format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1133 matchedViaGlob =
False
1134 for mtsType, mtsSize
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value.items():
1135 if fnmatch(dataType, mtsType):
1136 self.
conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1137 msg.info(
'Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1138 matchedViaGlob =
True
1140 if not matchedViaGlob
and "ALL" in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1141 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[
"ALL"] * 1000000
1142 msg.info(
'Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1147 for dataType
in output:
1148 if self.
conf.totalExecutorSteps <= 1:
1149 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1150 if 'eventService' not in self.
conf.argdict
or 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value
is False:
1151 if 'sharedWriter' in self.
conf.argdict
and self.
conf.argdict[
'sharedWriter'].value:
1152 msg.info(
"SharedWriter: not updating athena output filename for {0}".format(dataType))
1154 self.
conf._dataDictionary[dataType].value[0] +=
"_000"
1155 msg.info(
"Updated athena output filename for {0} to {1}".format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1160 if 'mpi' in self.
conf.argdict:
1161 msg.info(
"Running in MPI mode")
1162 mpi.setupMPIConfig(output, self.
conf.dataDictionary)
1167 for dataType
in input:
1168 inputFiles[dataType] = self.
conf.dataDictionary[dataType]
1169 outputFiles = dict()
1170 for dataType
in output:
1171 outputFiles[dataType] = self.
conf.dataDictionary[dataType]
1174 nameForFiles = commonExecutorStepName(self.
_name)
1175 for dataType, dataArg
in self.
conf.dataDictionary.items():
1176 if isinstance(dataArg, list)
and dataArg:
1177 if self.
conf.totalExecutorSteps <= 1:
1178 raise ValueError(
'Multiple input arguments provided but only running one substep')
1179 if self.
conf.totalExecutorSteps != len(dataArg):
1180 raise ValueError(f
'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1182 if dataArg[self.
conf.executorStep].io ==
'input' and nameForFiles
in dataArg[self.
conf.executorStep].executor:
1183 inputFiles[dataArg[self.
conf.executorStep].subtype] = dataArg
1185 if dataArg.io ==
'input' and nameForFiles
in dataArg.executor:
1186 inputFiles[dataArg.subtype] = dataArg
1188 msg.debug(
'Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1192 output = outputFiles)
1202 dbrelease = dbsetup =
None
1203 if 'DBRelease' in self.
conf.argdict:
1204 dbrelease = self.
conf.argdict[
'DBRelease'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1205 if path.islink(dbrelease):
1206 dbrelease = path.realpath(dbrelease)
1209 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1211 msg.debug(
'DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1212 if not os.access(dbrelease, os.R_OK):
1213 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1214 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1215 dbrelease = dbdMatch.group(1)
1216 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1219 msg.debug(
'Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1220 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1223 setupDBRelease(dbsetup)
1226 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1234 super(athenaExecutor, self).
preExecute(input, output)
1239 msg.info(
'Now writing wrapper for substep executor {0}'.format(self.
_name))
1241 msg.info(
'Athena will be executed in a subshell via {0}'.format(self.
_cmd))
1247 if 'mpi' in self.
conf.argdict:
1251 if self.
conf.totalExecutorSteps > 1:
1253 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1255 if self.
conf.executorStep == self.
conf.totalExecutorSteps - 1:
1261 for i
in range(self.
conf.totalExecutorSteps):
1262 for v
in self.
conf.dataDictionary[dataType].value:
1263 newValue.append(v.replace(
'_{0}{1}_'.format(executorStepSuffix, self.
conf.executorStep),
1264 '_{0}{1}_'.format(executorStepSuffix, i)))
1266 self.
conf.dataDictionary[dataType].multipleOK =
True
1268 for i
in range(self.
conf.totalExecutorSteps):
1269 newValue.append(self.
conf.dataDictionary[dataType].originalName +
'_{0}{1}'.format(executorStepSuffix, i))
1270 self.
conf.dataDictionary[dataType].value = newValue
1273 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1278 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1280 skipFileChecks=
False
1281 if 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value:
1285 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1288 if 'TXT_JIVEXMLTGZ' in self.
conf.dataDictionary:
1299 msg.info(
'scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self.
_logFileName))
1307 if (
'deleteIntermediateOutputfiles' in self.
conf._argdict
and self.
conf._argdict[
'deleteIntermediateOutputfiles'].value):
1308 inputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_input ])
1310 for k, v
in inputDataDictionary.items():
1311 if not v.io ==
'temporary':
1313 for filename
in v.value:
1314 if os.access(filename, os.R_OK)
and not filename.startswith(
"/cvmfs"):
1315 msg.info(
"Removing intermediate {0} input file {1}".format(k, filename))
1317 if (os.path.realpath(filename) != filename):
1318 targetpath = os.path.realpath(filename)
1320 if (targetpath)
and os.access(targetpath, os.R_OK):
1321 os.unlink(targetpath)
1327 deferredException =
None
1328 memLeakThreshold = 5000
1333 super(athenaExecutor, self).
validate()
1336 msg.error(
'Validation of return code failed: {0!s}'.format(e))
1337 deferredException = e
1347 msg.info(
'Analysing memory monitor output file {0} for possible memory leak'.format(self.
_memFullFile))
1352 msg.warning(
'Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1354 msg.warning(
'Failed to analyse the memory monitor file {0}'.format(self.
_memFullFile))
1356 msg.info(
'No memory monitor file to be analysed')
1362 if 'ignorePatterns' in self.
conf.argdict:
1363 igPat = self.
conf.argdict[
'ignorePatterns'].value
1366 if 'ignoreFiles' in self.
conf.argdict:
1374 msg.info(
'Scanning logfile {0} for errors in substep {1}'.format(self.
_logFileName, self.
_substep))
1376 ignoreList=ignorePatterns)
1377 worstError = self.
_logScan.worstError()
1378 eventLoopWarnings = self.
_logScan.eventLoopWarnings()
1384 if worstError[
'firstError']:
1385 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
1386 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
1387 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
1388 elif 'G4Exception' in worstError[
'firstError'][
'message']:
1389 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
1391 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
1393 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName, worstError[
'firstError'][
'message'])
1395 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".format(worstError[
'level'])
1398 if deferredException
is not None:
1400 if worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1401 deferredException.errMsg = deferredException.errMsg +
"; {0}".format(exitErrorMessage)
1404 deferredException.errMsg = deferredException.errMsg +
"; Possible memory leak: 'pss' slope: {0} KB/s".format(self.
_memLeakResult[
'slope'])
1405 raise deferredException
1409 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
1410 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1412 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']
and (
not mpi.mpiShouldValidate()):
1413 msg.warning(f
'Found {worstError["level"]} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1414 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1416 msg.error(
'Fatal error in athena logfile (level {0})'.format(worstError[
'level']))
1419 exitErrorMessage = exitErrorMessage +
"; Possible memory leak: 'pss' slope: {0} KB/s".format(self.
_memLeakResult[
'slope'])
1421 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1424 if (len(eventLoopWarnings) > 0):
1425 msg.warning(
'Found WARNINGS in the event loop, as follows:')
1426 for element
in eventLoopWarnings:
1427 msg.warning(
'{0} {1} ({2} instances)'.format(element[
'item'][
'service'],element[
'item'][
'message'],element[
'count']))
1430 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
1434 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
1439 if 'CA' not in self.
conf.argdict:
1447 if self.
conf.argdict[
'CA']
is None:
1451 if self.
conf.argdict[
'CA'].returnMyValue(name=self.
name, substep=self.
substep)
is True:
1461 if 'athena' in self.
conf.argdict:
1466 currentSubstep =
None
1467 if 'athenaopts' in self.
conf.argdict:
1468 currentName = commonExecutorStepName(self.
name)
1469 if currentName
in self.
conf.argdict[
'athenaopts'].value:
1470 currentSubstep = currentName
1471 if self.
substep in self.
conf.argdict[
'athenaopts'].value:
1472 msg.info(
'Athenaopts found for {0} and {1}, joining options. '
1473 'Consider changing your configuration to use just the name or the alias of the substep.'
1475 self.
conf.argdict[
'athenaopts'].value[currentSubstep].extend(self.
conf.argdict[
'athenaopts'].value[self.
substep])
1476 del self.
conf.argdict[
'athenaopts'].value[self.
substep]
1477 msg.debug(
'Athenaopts: {0}'.format(self.
conf.argdict[
'athenaopts'].value))
1478 elif self.
substep in self.
conf.argdict[
'athenaopts'].value:
1480 elif 'all' in self.
conf.argdict[
'athenaopts'].value:
1481 currentSubstep =
'all'
1484 preLoadUpdated = dict()
1486 preLoadUpdated[currentSubstep] =
False
1487 if 'athenaopts' in self.
conf.argdict:
1488 if currentSubstep
is not None:
1489 for athArg
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]:
1492 if athArg.startswith(
'--preloadlib'):
1494 i = self.
conf.argdict[
'athenaopts'].value[currentSubstep].
index(athArg)
1495 v = athArg.split(
'=', 1)[1]
1496 msg.info(
'Updating athena --preloadlib option for substep {1} with: {0}'.format(self.
_envUpdate.value(
'LD_PRELOAD'), self.
name))
1498 self.
conf.argdict[
'athenaopts']._value[currentSubstep][i] =
'--preloadlib={0}'.format(newPreloads)
1499 except Exception
as e:
1500 msg.warning(
'Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1501 preLoadUpdated[currentSubstep] =
True
1503 if not preLoadUpdated[currentSubstep]:
1504 msg.info(
'Setting athena preloadlibs for substep {1} to: {0}'.format(self.
_envUpdate.value(
'LD_PRELOAD'), self.
name))
1505 if 'athenaopts' in self.
conf.argdict:
1506 if currentSubstep
is not None:
1507 self.
conf.argdict[
'athenaopts'].value[currentSubstep].append(
"--preloadlib={0}".format(self.
_envUpdate.value(
'LD_PRELOAD')))
1509 self.
conf.argdict[
'athenaopts'].value[
'all'] = [
"--preloadlib={0}".format(self.
_envUpdate.value(
'LD_PRELOAD'))]
1514 if 'athenaopts' in self.
conf.argdict:
1515 if currentSubstep
is None and "all" in self.
conf.argdict[
'athenaopts'].value:
1516 self.
_cmd.extend(self.
conf.argdict[
'athenaopts'].value[
'all'])
1517 elif currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1518 self.
_cmd.extend(self.
conf.argdict[
'athenaopts'].value[currentSubstep])
1520 if currentSubstep
is None:
1521 currentSubstep =
'all'
1525 msg.info(
'ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1526 elif 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1527 msg.info(
'Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1528 elif 'athenaopts' in self.
conf.argdict:
1529 athenaConfigRelatedOpts = [
'--config-only',
'--drop-and-reload']
1531 if currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1532 conflictOpts =
set(athenaConfigRelatedOpts).
intersection(
set([opt.split(
'=')[0]
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]]))
1533 if len(conflictOpts) > 0:
1534 msg.info(
'Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1536 msg.info(
'Appending "--drop-and-reload" to athena options')
1537 self.
_cmd.append(
'--drop-and-reload')
1539 msg.info(
'No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.
name))
1540 self.
_cmd.append(
'--drop-and-reload')
1543 msg.info(
'Appending "--drop-and-reload" to athena options')
1544 self.
_cmd.append(
'--drop-and-reload')
1546 msg.info(
'Skipping test for "--drop-and-reload" in this executor')
1551 if not (
'athenaopts' in self.
conf.argdict
and
1552 any(
'--threads' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1557 if not (
'athenaopts' in self.
conf.argdict
and
1558 any(
'--nprocs' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1565 msg.info(
'Updated script arguments with topoptions: %s', self.
_cmd)
1585 setupATLAS =
'my_setupATLAS.sh'
1586 with open(setupATLAS,
'w')
as f:
1587 print(
"#!/bin/bash", file=f)
1589if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1590 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1592source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1594 os.chmod(setupATLAS, 0o755)
1597 'Preparing wrapper file {wrapperFileName} with '
1598 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1605 container_cmd =
None
1608 print(
'#!/bin/sh', file=wrapper)
1610 container_cmd = [ os.path.abspath(setupATLAS),
1618 print(
'echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1619 print(
'echo " '+
" ".join([
'setupATLAS'] + container_cmd[1:] + [path.join(
'.', self.
_wrapperFile)]) +
'"', file=wrapper)
1620 print(
'echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1622 print(
'echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1624 print(
'echo', file=wrapper)
1634 print(f
'source ./{setupATLAS} -q', file=wfile)
1635 print(f
'asetup {asetup}', file=wfile)
1636 print(
'if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1638 dbroot = path.dirname(dbsetup)
1639 dbversion = path.basename(dbroot)
1640 print(
"# DBRelease setup", file=wrapper)
1641 print(
'echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1642 print(
'export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1643 print(
'export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1644 print(
'export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1645 print(
'export TNS_ADMIN={directory}'.format(directory = path.join(dbroot,
'oracle-admin')), file=wrapper)
1646 print(
'DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1648 print(
"# AthenaMT explicitly disabled for this executor", file=wrapper)
1650 print(
"# AthenaMP explicitly disabled for this executor", file=wrapper)
1653 if not envSetting.startswith(
'LD_PRELOAD'):
1654 print(
"export", envSetting, file=wrapper)
1658 if 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1659 msg.info(
'Valgrind engaged')
1662 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".format(
1666 print(
' '.join(self.
_cmd),
"--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1667 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1670 if 'valgrindDefaultOpts' in self.
conf._argdict:
1671 defaultOptions = self.
conf._argdict[
'valgrindDefaultOpts'].value
1673 defaultOptions =
True
1674 if 'valgrindExtraOpts' in self.
conf._argdict:
1675 extraOptionsList = self.
conf._argdict[
'valgrindExtraOpts'].value
1677 extraOptionsList =
None
1678 msg.debug(
"requested Valgrind command basic options: {options}".format(options = defaultOptions))
1679 msg.debug(
"requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1680 command = ValgrindCommand(
1681 defaultOptions = defaultOptions,
1682 extraOptionsList = extraOptionsList,
1683 AthenaSerialisedConfigurationFile = \
1684 AthenaSerialisedConfigurationFile
1686 msg.debug(
"Valgrind command: {command}".format(command = command))
1687 print(command, file=wrapper)
1691 elif 'vtune' in self.
conf._argdict
and self.
conf._argdict[
'vtune'].value
is True:
1692 msg.info(
'VTune engaged')
1695 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".format(
1699 print(
' '.join(self.
_cmd),
"--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1700 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1703 if 'vtuneDefaultOpts' in self.
conf._argdict:
1704 defaultOptions = self.
conf._argdict[
'vtuneDefaultOpts'].value
1706 defaultOptions =
True
1707 if 'vtuneExtraOpts' in self.
conf._argdict:
1708 extraOptionsList = self.
conf._argdict[
'vtuneExtraOpts'].value
1710 extraOptionsList =
None
1716 AthenaCommand = self.
_cmd
1717 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1719 msg.debug(
"requested VTune command basic options: {options}".format(options = defaultOptions))
1720 msg.debug(
"requested VTune command extra options: {options}".format(options = extraOptionsList))
1721 command = VTuneCommand(
1722 defaultOptions = defaultOptions,
1723 extraOptionsList = extraOptionsList,
1724 AthenaCommand = AthenaCommand
1726 msg.debug(
"VTune command: {command}".format(command = command))
1727 print(command, file=wrapper)
1729 msg.info(
'Valgrind/VTune not engaged')
1731 print(
' '.join(self.
_cmd), file=wrapper)
1733 except OSError
as e:
1734 errMsg =
'error writing athena wrapper {fileName}: {error}'.format(
1740 trfExit.nameToCode(
'TRF_EXEC_SETUP_WRAPPER'),
1746 self.
_cmd = container_cmd + self.
_cmd
1753 if 'selfMerge' not in dir(fileArg):
1754 msg.info(
'Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1757 if fileArg.mergeTargetSize == 0:
1758 msg.info(
'Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1762 mergeCandidates = [list()]
1763 currentMergeSize = 0
1764 for fname
in fileArg.value:
1765 size = fileArg.getSingleMetadata(fname,
'file_size')
1766 if not isinstance(size, int):
1767 msg.warning(
'File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg,
type(size)))
1770 if len(mergeCandidates[-1]) == 0:
1771 msg.debug(
'Adding file {0} to current empty merge list'.format(fname))
1772 mergeCandidates[-1].append(fname)
1773 currentMergeSize += size
1776 if fileArg.mergeTargetSize < 0
or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1777 msg.debug(
'Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1778 mergeCandidates[-1].append(fname)
1779 currentMergeSize += size
1782 msg.debug(
'Starting a new merge list with file {0}'.format(fname))
1783 mergeCandidates.append([fname])
1784 currentMergeSize = size
1786 msg.debug(
'First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1788 if len(mergeCandidates) == 1:
1791 mergeNames = [fileArg.originalName]
1796 for mergeGroup
in mergeCandidates:
1799 mergeName = fileArg.originalName +
'_{0}'.format(counter)
1800 while path.exists(mergeName):
1802 mergeName = fileArg.originalName +
'_{0}'.format(counter)
1803 mergeNames.append(mergeName)
1806 for targetName, mergeGroup, counter
in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1807 msg.info(
'Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1808 if len(mergeGroup) <= 1:
1809 msg.info(
'Skip merging for single file')
1812 self.
_myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.
conf.argdict))
1817 targetTGZName = self.
conf.dataDictionary[
'TXT_JIVEXMLTGZ'].value[0]
1818 if os.path.exists(targetTGZName):
1819 os.remove(targetTGZName)
1822 fNameRE = re.compile(
r"JiveXML\_\d+\_\d+.xml")
1825 tar = tarfile.open(targetTGZName,
"w:gz")
1826 for fName
in os.listdir(
'.'):
1827 matches = fNameRE.findall(fName)
1828 if len(matches) > 0:
1829 if fNameRE.findall(fName)[0] == fName:
1830 msg.info(
'adding %s to %s', fName, targetTGZName)
1834 msg.info(
'JiveXML compression: %s has been written and closed.', targetTGZName)
1844 super(optionalAthenaExecutor, self).
validate()
1847 msg.warning(
'Validation failed for {0}: {1}'.format(self.
_name, e))
1852 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
1867 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1868 inData = set(), outData =
set(), exe =
'athena.py', exeArgs = [
'athenaopts'], substep =
None, inputEventTest =
True,
1869 perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {},
1870 manualDataDictionary =
None, memMonitor =
True):
1872 super(POOLMergeExecutor, self).
__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1873 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1874 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1875 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1876 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1880 super(POOLMergeExecutor, self).
preExecute(input=input, output=output)
1885 super(POOLMergeExecutor, self).
execute()
1895 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
1896 if 'NTUP_PILEUP' not in output:
1898 if 'formats' not in self.
conf.argdict:
1900 'No derivation configuration specified')
1902 if (
'DAOD' not in output)
and (
'D2AOD' not in output):
1904 'No base name for DAOD output')
1907 if 'formats' in self.
conf.argdict: formatList = self.
conf.argdict[
'formats'].value
1908 for reduction
in formatList:
1909 if (
'DAOD' in output):
1910 dataType =
'DAOD_' + reduction
1911 if 'augmentations' not in self.
conf.argdict:
1912 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1914 for val
in self.
conf.argdict[
'augmentations'].value:
1915 if reduction
in val.split(
':')[0]:
1916 outputName =
'DAOD_' + val.split(
':')[1] +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1919 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1921 if (
'D2AOD' in output):
1922 dataType =
'D2AOD_' + reduction
1923 outputName =
'D2AOD_' + reduction +
'.' + self.
conf.argdict[
'outputD2AODFile'].value[0]
1925 msg.info(
'Adding reduction output type {0}'.format(dataType))
1926 output.add(dataType)
1930 self.
conf.dataDictionary[dataType] = newReduction
1934 if (
'DAOD' in output):
1935 output.remove(
'DAOD')
1936 del self.
conf.dataDictionary[
'DAOD']
1937 del self.
conf.argdict[
'outputDAODFile']
1938 if (
'D2AOD' in output):
1939 output.remove(
'D2AOD')
1940 del self.
conf.dataDictionary[
'D2AOD']
1941 del self.
conf.argdict[
'outputD2AODFile']
1943 msg.info(
'Data dictionary is now: {0}'.format(self.
conf.dataDictionary))
1944 msg.info(
'Input/Output: {0}/{1}'.format(input, output))
1946 msg.info(
'Data dictionary is now: {0}'.format(self.
conf.dataDictionary))
1947 msg.info(
'Input/Output: {0}/{1}'.format(input, output))
1948 super(reductionFrameworkExecutor, self).
preExecute(input, output)
1953 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set([
'HIST_AOD',
'HIST_ESD']), outData=
set([
'HIST']),
1954 exe=
'DQHistogramMerge.py', exeArgs = [], memMonitor =
True):
1958 super(DQMergeExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1959 exeArgs=exeArgs, memMonitor=memMonitor)
1964 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
1966 super(DQMergeExecutor, self).
preExecute(input=input, output=output)
1970 for dataType
in input:
1971 for fname
in self.
conf.dataDictionary[dataType].value:
1972 self.
conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1973 print(fname, file=DQMergeFile)
1978 if len(output) != 1:
1980 'One (and only one) output file must be given to {0} (got {1})'.format(self.
name, len(output)))
1981 outDataType = list(output)[0]
1982 self.
_cmd.append(self.
conf.dataDictionary[outDataType].value[0])
1985 if (self.
conf._argdict.get(
"run_post_processing",
False)):
1986 self.
_cmd.append(
'True')
1988 self.
_cmd.append(
'False')
1990 if (self.
conf._argdict.get(
"is_incremental_merge",
False)):
1991 self.
_cmd.append(
'True')
1993 self.
_cmd.append(
'False')
1995 for k
in (
"excludeHist",
"excludeDir"):
1996 if k
in self.
conf._argdict:
1997 self.
_cmd.append(
"--{0}={1}".format(k,self.
conf._argdict[k]))
2002 super(DQMergeExecutor, self).
validate()
2004 exitErrorMessage =
''
2008 worstError = logScan.worstError()
2012 if worstError[
'firstError']:
2013 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2014 exitErrorMessage =
"Long {0} message at line {1}" \
2015 " (see jobReport for further details)".format(worstError[
'level'],
2016 worstError[
'firstError'][
'firstLine'])
2018 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName,
2019 worstError[
'firstError'][
'message'])
2020 except OSError
as e:
2021 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2023 'Exception raised while attempting to scan logfile {0}: {1}'.format(self.
_logFileName, e))
2025 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2026 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2027 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2029 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2031 msg.error(
'Fatal error in script logfile (level {0})'.format(worstError[
'level']))
2032 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2036 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
2040 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
2044 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set([
'HIST']), outData=
set([
'HIST']),
2045 exe=
'DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor =
True):
2049 super(DQMPostProcessExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2050 exeArgs=exeArgs, memMonitor=memMonitor)
2055 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
2057 super(DQMPostProcessExecutor, self).
preExecute(input=input, output=output)
2060 dsName=self.
conf.argdict[
"inputHISTFile"].dataset
2062 for dataType
in input:
2063 for fname
in self.
conf.dataDictionary[dataType].value:
2065 if not dsName: dsName=
".".join(fname.split(
'.')[0:4])
2066 inputList.append(
"#".join([dsName,fname]))
2069 if len(output) != 1:
2071 'One (and only one) output file must be given to {0} (got {1})'.format(self.
name, len(output)))
2072 outDataType = list(output)[0]
2076 wrapperParams={
"inputHistFiles" : inputList,
2077 "outputHistFile" : dsName+
"#"+self.
conf.dataDictionary[outDataType].value[0],
2078 "incrementalMode":
"True" if self.
conf._argdict.get(
"is_incremental_merge",
False)
else "False",
2079 "postProcessing" :
"True" if self.
conf._argdict.get(
"run_post_processing",
False)
else "False",
2080 "doWebDisplay" :
"True" if self.
conf._argdict.get(
"doWebDisplay",
False)
else "False",
2081 "allowCOOLUpload":
"True" if self.
conf._argdict.get(
"allowCOOLUpload",
False)
else "False",
2085 if "servers" in self.
conf._argdict:
2086 wrapperParams[
"server"]=self.
conf._argdict[
"servers"]
2088 for k
in (
"excludeHist",
"excludeDir"):
2089 if k
in self.
conf._argdict:
2090 wrapperParams[
"mergeParams"]+=(
" --{0}={1}".format(k,self.
conf._argdict[k]))
2093 with open(
"args.json",
"w")
as f:
2094 json.dump(wrapperParams, f)
2096 self.
_cmd.append(
"--argJSON=args.json")
2102 super(DQMPostProcessExecutor, self).
validate()
2104 exitErrorMessage =
''
2108 worstError = logScan.worstError()
2112 if worstError[
'firstError']:
2113 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2114 exitErrorMessage =
"Long {0} message at line {1}" \
2115 " (see jobReport for further details)".format(worstError[
'level'],
2116 worstError[
'firstError'][
'firstLine'])
2118 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName,
2119 worstError[
'firstError'][
'message'])
2120 except OSError
as e:
2121 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2123 'Exception raised while attempting to scan logfile {0}: {1}'.format(self.
_logFileName, e))
2125 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2126 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2127 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2129 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2131 msg.error(
'Fatal error in script logfile (level {0})'.format(worstError[
'level']))
2132 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2136 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
2140 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
2146 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
2149 if self.
_exe is None:
2155 if len(output) != 1:
2157 'One (and only one) output file must be given to {0} (got {1})'.format(self.
name, len(output)))
2158 outDataType = list(output)[0]
2159 self.
_cmd.append(self.
conf.dataDictionary[outDataType].value[0])
2161 for dataType
in input:
2162 self.
_cmd.extend(self.
conf.dataDictionary[dataType].value)
2164 super(NTUPMergeExecutor, self).
preExecute(input=input, output=output)
2168 """Executor for running physvalPostProcessing.py with <input> <output> args"""
2172 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
2176 if len(input) != 1
or len(output) != 1:
2178 f
'Exactly one input and one output must be specified (got inputs={len(input)}, outputs={len(output)})')
2180 self.
_cmd.append(self.
conf.dataDictionary[list(input)[0]].value[0])
2181 self.
_cmd.append(self.
conf.dataDictionary[list(output)[0]].value[0])
2184 super(NtupPhysValPostProcessingExecutor, self).
preExecute(input=input, output=output)
2195 if 'maskEmptyInputs' in self.
conf.argdict
and self.
conf.argdict[
'maskEmptyInputs'].value
is True:
2197 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2198 nEvents = self.
conf.dataDictionary[self.
_inputBS].getSingleMetadata(fname,
'nentries')
2199 msg.debug(
'Found {0} events in file {1}'.format(nEvents, fname))
2200 if isinstance(nEvents, int)
and nEvents > 0:
2201 eventfullFiles.append(fname)
2204 msg.info(
'The following input files are masked because they have 0 events: {0}'.format(
' '.join(self.
_maskedFiles)))
2205 if len(eventfullFiles) == 0:
2206 if 'emptyStubFile' in self.
conf.argdict
and path.exists(self.
conf.argdict[
'emptyStubFile'].value):
2208 msg.info(
"All input files are empty - will use stub file {0} as output".format(self.
conf.argdict[
'emptyStubFile'].value))
2211 'All input files had zero events - aborting BS merge')
2218 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2220 print(fname, file=BSFileList)
2221 except OSError
as e:
2222 errMsg =
'Got an error when writing list of BS files to {0}: {1}'.format(self.
_mergeBSFileList, e)
2231 elif self.
conf.argdict[
'allowRename'].value
is True:
2233 msg.info(
'Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2237 errmsg =
'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2243 super(bsMergeExecutor, self).
preExecute(input=input, output=output)
2249 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
2250 msg.info(
"Using stub file for empty BS output - execution is fake")
2257 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
2259 super(bsMergeExecutor, self).
execute()
2269 except OSError
as e:
2284 if 'outputArchFile' not in self.
conf.argdict:
2289 with open(
'zip_wrapper.py',
'w')
as zip_wrapper:
2290 print(
"import zipfile, os, shutil", file=zip_wrapper)
2291 if os.path.exists(self.
conf.argdict[
'outputArchFile'].value[0]):
2293 print(
"zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2296 print(
"zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2297 print(
"for f in {}:".format(self.
conf.argdict[
'inputDataFile'].value), file=zip_wrapper)
2300 print(
" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2301 print(
" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2302 print(
" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2303 print(
" archive.extractall('tmp')", file=zip_wrapper)
2304 print(
" archive.close()", file=zip_wrapper)
2306 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2307 print(
" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2308 print(
" os.unlink(f)", file=zip_wrapper)
2309 print(
" if os.path.isdir('tmp'):", file=zip_wrapper)
2310 print(
" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2311 print(
" for name in files:", file=zip_wrapper)
2312 print(
" print 'Zipping {}'.format(name)", file=zip_wrapper)
2313 print(
" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2314 print(
" shutil.rmtree('tmp')", file=zip_wrapper)
2315 print(
" else:", file=zip_wrapper)
2316 print(
" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2317 print(
" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2318 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2319 print(
" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2320 print(
" os.unlink(f)", file=zip_wrapper)
2321 print(
"zf.close()", file=zip_wrapper)
2322 os.chmod(
'zip_wrapper.py', 0o755)
2323 except OSError
as e:
2324 errMsg =
'error writing zip wrapper {fileName}: {error}'.format(fileName =
'zip_wrapper.py',
2331 self.
_cmd.append(
'zip_wrapper.py')
2334 elif self.
_exe ==
'unarchive':
2336 for infile
in self.
conf.argdict[
'inputArchFile'].value:
2337 if not zipfile.is_zipfile(infile):
2339 'An input file is not a zip archive - aborting unpacking')
2340 self.
_cmd = [
'python']
2342 with open(
'unarchive_wrapper.py',
'w')
as unarchive_wrapper:
2343 print(
"import zipfile", file=unarchive_wrapper)
2344 print(
"for f in {}:".format(self.
conf.argdict[
'inputArchFile'].value), file=unarchive_wrapper)
2345 print(
" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2346 print(
" path = '{}'".format(self.
conf.argdict[
'path']), file=unarchive_wrapper)
2347 print(
" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2348 print(
" archive.extractall(path)", file=unarchive_wrapper)
2349 print(
" archive.close()", file=unarchive_wrapper)
2350 os.chmod(
'unarchive_wrapper.py', 0o755)
2351 except OSError
as e:
2352 errMsg =
'error writing unarchive wrapper {fileName}: {error}'.format(fileName =
'unarchive_wrapper.py',
2359 self.
_cmd.append(
'unarchive_wrapper.py')
2360 super(archiveExecutor, self).
preExecute(input=input, output=output)
void print(char *figname, TCanvas *c1)
Argument class for substep lists, suitable for preExec/postExec.
Class holding the update to an environment that will be passed on to an executor.
Specialist execution class for DQM post-processing of histograms.
__init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
preExecute(self, input=set(), output=set())
Specialist execution class for merging DQ histograms.
__init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
preExecute(self, input=set(), output=set())
Specialist execution class for merging NTUPLE files.
preExecute(self, input=set(), output=set())
Specialist execution class for running post processing on merged PHYVAL NTUPLE file.
preExecute(self, input=set(), output=set())
preExecute(self, input=set(), output=set())
__init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
preExecute(self, input=set(), output=set())
str _athenaMPEventOrdersFile
_writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
_smartMerge(self, fileArg)
Manage smart merging of output files.
preExecute(self, input=set(), output=set())
__init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
bool _athenaMPReadEventOrders
inputDataTypeCountCheck(self)
_prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
str _athenaMPWorkerTopDir
_isCAEnabled(self)
Check if running with CA.
_skeletonCA
Handle MPI setup.
_tryDropAndReload
Add –drop-and-reload if possible (and allowed!).
_envUpdate
Look for environment updates and perpare the athena command line.
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
preExecute(self, input=set(), output=set())
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
__init__(self, name='Echo', trf=None)
setFromTransform(self, trf)
Set configuration properties from the parent transform.
addToArgdict(self, key, value)
Add a new object to the argdict.
addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
__init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Special executor that will enable a logfile scan as part of its validation.
__init__(self, name='Logscan')
preExecute(self, input=set(), output=set())
Athena executor where failure is not consisered fatal.
Specialist executor to manage the handling of multiple implicit input and output files within the der...
preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
_buildStandardCommand(self)
preExecute(self, input=set(), output=set())
__init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Small class used for vailiadating event counts between input and output files.
Class of patterns that can be ignored from athena logfiles.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
bool add(const std::string &hname, TKey *tobj)
std::vector< std::string > split(const std::string &s, const std::string &t=":")