20from fnmatch
import fnmatch
21msg = logging.getLogger(__name__)
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27from PyJobTransforms.trfExeStepTools
import commonExecutorStepName, executorStepSuffix
33import PyJobTransforms.trfExceptions
as trfExceptions
36import PyJobTransforms.trfEnv
as trfEnv
46 enc = s.encoding.lower()
47 if enc.find(
'ascii') >= 0
or enc.find(
'ansi') >= 0:
48 return open (s.fileno(),
'w', encoding=
'utf-8')
63 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
82 @dataDictionary.setter
106 @totalExecutorSteps.setter
138 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
155 if len(dataOverlap) > 0:
157 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.format(self.
_name,
' '.join(dataOverlap)))
167 self.
conf.setFromTransform(trf)
220 if '_substep' in dir(self):
226 if '_trf' in dir(self):
237 if '_inData' in dir(self):
247 if '_inData' in dir(self):
257 if '_outData' in dir(self):
267 if '_outData' in dir(self):
278 if '_input' in dir(self):
287 if '_output' in dir(self):
326 if hasattr(self,
'_first'):
464 msg.debug(
'preExeStart time is {0}'.format(self.
_preExeStart))
469 msg.debug(
'valStart time is {0}'.format(self.
_valStart))
473 msg.info(
'Preexecute for %s', self.
_name)
477 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
478 msg.info(
'Starting execution of %s', self.
_name)
482 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
484 msg.debug(
'preExeStop time is {0}'.format(self.
_exeStop))
487 msg.info(
'Postexecute for %s', self.
_name)
492 msg.info(
'Executor %s has no validation function - assuming all ok', self.
_name)
496 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
508 super(logscanExecutor, self).
__init__(name=name)
514 msg.info(
'Preexecute for %s', self.
_name)
515 if 'logfile' in self.
conf.argdict:
520 msg.info(
"Starting validation for {0}".format(self.
_name))
524 if 'ignorePatterns' in self.
conf.argdict:
525 igPat = self.
conf.argdict[
'ignorePatterns'].value
528 if 'ignoreFiles' in self.
conf.argdict:
536 msg.info(
'Scanning logfile {0} for errors'.format(self.
_logFileName))
538 worstError = self.
_logScan.worstError()
542 if worstError[
'firstError']:
543 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
544 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
545 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
546 elif 'G4Exception' in worstError[
'firstError'][
'message']:
547 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
549 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
551 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName, worstError[
'firstError'][
'message'])
553 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".format(worstError[
'level'])
556 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
557 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
560 msg.error(
'Fatal error in athena logfile (level {0})'.format(worstError[
'level']))
562 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
565 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
570 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
577 super(echoExecutor, self).
__init__(name=name, trf=trf)
582 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
583 msg.info(
'Starting execution of %s', self.
_name)
584 msg.info(
'Transform argument dictionary now follows:')
585 for k, v
in self.
conf.argdict.items():
586 print(
"%s = %s" % (k, v))
590 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
592 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
596 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
599 super(dummyExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
604 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
605 msg.info(
'Starting execution of %s', self.
_name)
607 for k, v
in self.
conf.argdict.items():
609 msg.info(
'Creating dummy output file: {0}'.format(self.
conf.argdict[k].value[0]))
610 open(self.
conf.argdict[k].value[0],
'a').close()
614 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
616 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
620 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData =
set(),
621 exe =
None, exeArgs =
None, memMonitor =
True):
628 super(scriptExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
660 msg.debug(
'scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
666 if self.
_cmd is None:
668 msg.info(
'Will execute script as %s', self.
_cmd)
675 if 'TRF_ECHO' in os.environ:
676 msg.info(
'TRF_ECHO envvar is set - enabling command echoing to stdout')
678 elif 'TRF_NOECHO' in os.environ:
679 msg.info(
'TRF_NOECHO envvar is set - disabling command echoing to stdout')
682 elif isInteractiveEnv():
683 msg.info(
'Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
685 elif 'TZHOME' in os.environ:
686 msg.info(
'Tier-0 environment detected - enabling command echoing to stdout')
689 msg.info(
'Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.format(self.
_name, self.
_logFileName))
696 encargs = {
'encoding' :
'utf-8'}
698 self.
_exeLogFile.setFormatter(logging.Formatter(
'%(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
703 self.
_echostream.setFormatter(logging.Formatter(
'%(name)s %(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
711 'No executor set in {0}'.format(self.__class__.__name__))
713 if arg
in self.
conf.argdict:
717 if isinstance(self.
conf.argdict[arg].value, list):
718 self.
_cmd.extend([ str(v)
for v
in self.
conf.argdict[arg].value])
720 self.
_cmd.append(str(self.
conf.argdict[arg].value))
725 msg.info(
'Starting execution of {0} ({1})'.format(self.
_name, self.
_cmd))
728 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
729 if (
'execOnly' in self.
conf.argdict
and self.
conf.argdict[
'execOnly']
is True):
730 msg.info(
'execOnly flag is set - execution will now switch, replacing the transform')
733 encargs = {
'encoding' :
'utf8'}
739 msg.info(
"chdir /srv to launch a nested container for the substep")
741 p = subprocess.Popen(self.
_cmd, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
744 msg.info(
"chdir {} after launching the nested container".format(self.
_workdir))
751 memMonitorCommand = [
'prmon',
'--pid', str(p.pid),
'--filename',
'prmon.full.' + self.
_name,
754 mem_proc = subprocess.Popen(memMonitorCommand, shell =
False, close_fds=
True, **encargs)
756 except Exception
as e:
757 msg.warning(
'Failed to spawn memory monitor for {0}: {1}'.format(self.
_name, e))
760 while p.poll()
is None:
762 line = p.stdout.readline()
765 except UnicodeDecodeError
as e:
766 msg.warning(
'Exception raised processing athena log: {0}'.format(e))
768 for line
in p.stdout:
772 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
774 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
776 errMsg =
'Execution of {0} failed and raised OSError: {1}'.format(self.
_cmd[0], e)
782 mem_proc.send_signal(signal.SIGUSR1)
784 while (
not mem_proc.poll())
and countWait < 10:
798 except Exception
as e:
799 msg.warning(
'Failed to load JSON memory summmary file {0}: {1}'.format(self.
_memSummaryFile, e))
807 msg.debug(
'valStart time is {0}'.format(self.
_valStart))
812 msg.info(
'Executor {0} validated successfully (return code {1})'.format(self.
_name, self.
_rc))
823 if trfExit.codeToSignalname(self.
_rc) !=
"":
824 self.
_errMsg =
'{0} got a {1} signal (exit code {2})'.format(self.
_name, trfExit.codeToSignalname(self.
_rc), self.
_rc)
826 self.
_errMsg =
'Non-zero return code from %s (%d)' % (self.
_name, self.
_rc)
831 if 'checkEventCount' in self.
conf.argdict
and self.
conf.argdict[
'checkEventCount'].returnMyValue(exe=self)
is False:
832 msg.info(
'Event counting for substep {0} is skipped'.format(self.
name))
834 if 'mpi' in self.
conf.argdict
and not mpi.mpiShouldValidate():
835 msg.info(
'MPI mode -- skipping output event count check')
840 msg.info(
'Event counting for substep {0} passed'.format(self.
name))
843 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
848 _exitMessageLimit = 200
849 _defaultIgnorePatternFile = [
'atlas_error_mask.db']
888 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
889 inData = set(), outData =
set(), inputDataTypeCountCheck =
None, exe =
'athena.py', exeArgs = [
'athenaopts'],
890 substep =
None, inputEventTest =
True, perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {}, runtimeRunargs = {},
891 literalRunargs = [], dataArgs = [], checkEventCount =
False, errorMaskFiles =
None,
892 manualDataDictionary =
None, memMonitor =
True, disableMT =
False, disableMP =
False, onlyMP =
False, onlyMT =
False, onlyMPWithRunargs =
None):
912 msg.debug(
"Resource monitoring from PerfMon is now deprecated")
915 if isinstance(skeletonFile, str):
920 super(athenaExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
921 exeArgs=exeArgs, memMonitor=memMonitor)
928 self.
_jobOptionsTemplate = JobOptionsTemplate(exe = self, version =
'$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
936 @inputDataTypeCountCheck.setter
981 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
989 if self.
conf.dataDictionary[dataType].nentries ==
'UNDEFINED':
992 thisInputEvents = self.
conf.dataDictionary[dataType].nentries
993 if thisInputEvents > inputEvents:
994 inputEvents = thisInputEvents
998 if (
'skipEvents' in self.
conf.argdict
and
999 self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
1000 mySkipEvents = self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1004 if (
'maxEvents' in self.
conf.argdict
and
1005 self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
1006 myMaxEvents = self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1011 if (self.
_inputEventTest and mySkipEvents > 0
and mySkipEvents >= inputEvents):
1013 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1017 if (myMaxEvents != -1):
1019 expectedEvents = myMaxEvents
1021 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1023 expectedEvents = inputEvents-mySkipEvents
1026 msg.info(
'input event count is UNDEFINED, setting expectedEvents to 0')
1032 OSSetupString =
None
1036 legacyThreadingRelease =
False
1037 if 'asetup' in self.
conf.argdict:
1038 asetupString = self.
conf.argdict[
'asetup'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1039 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1041 msg.info(
'Asetup report: {0}'.format(asetupReport()))
1043 if asetupString
is not None:
1044 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1045 currentOS = os.environ[
'ALRB_USER_PLATFORM']
1046 if legacyOSRelease
and "centos7" not in currentOS:
1047 OSSetupString =
"centos7"
1048 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.format(self.
_substep, OSSetupString))
1052 if 'runInContainer' in self.
conf.argdict:
1053 OSSetupString = self.
conf.argdict[
'runInContainer'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1054 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.format(self.
_substep, OSSetupString))
1055 if OSSetupString
is not None and asetupString
is None:
1057 '--asetup must be used for the substep which requires --runInContainer')
1062 if k
in self.
conf._argdict:
1066 if (((
'multithreaded' in self.
conf._argdict
and self.
conf._argdict[
'multithreaded'].value)
or (
'multiprocess' in self.
conf._argdict
and self.
conf._argdict[
'multiprocess'].value))
and
1067 (
'ATHENA_CORE_NUMBER' not in os.environ)):
1069 msg.warning(
'either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1081 msg.info(
"This configuration does not support MT, falling back to MP")
1089 msg.info(
"This configuration does not support MP, using MT")
1098 msg.info(
"Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self.
_athenaMP))
1103 if self.
conf.totalExecutorSteps > 1:
1104 for dataType
in output:
1105 if self.
conf._dataDictionary[dataType].originalName:
1106 self.
conf._dataDictionary[dataType].value[0] = self.
conf._dataDictionary[dataType].originalName
1108 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1109 self.
conf._dataDictionary[dataType].value[0] +=
"_{0}{1}".format(executorStepSuffix, self.
conf.executorStep)
1110 msg.info(
"Updated athena output filename for {0} to {1}".format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1117 if 'athenaMPUseEventOrders' in self.
conf.argdict
and self.
conf._argdict[
'athenaMPUseEventOrders'].value
is True:
1122 if (
'athenaMPStrategy' in self.
conf.argdict
and
1123 (self.
conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None)):
1129 if 'athenaMPMergeTargetSize' in self.
conf.argdict:
1130 for dataType
in output:
1131 if dataType
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1132 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[dataType] * 1000000
1133 msg.info(
'Set target merge size for {0} to {1}'.format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1136 matchedViaGlob =
False
1137 for mtsType, mtsSize
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value.items():
1138 if fnmatch(dataType, mtsType):
1139 self.
conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1140 msg.info(
'Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1141 matchedViaGlob =
True
1143 if not matchedViaGlob
and "ALL" in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1144 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[
"ALL"] * 1000000
1145 msg.info(
'Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1150 for dataType
in output:
1151 if self.
conf.totalExecutorSteps <= 1:
1152 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1153 if 'eventService' not in self.
conf.argdict
or 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value
is False:
1154 if 'sharedWriter' in self.
conf.argdict
and self.
conf.argdict[
'sharedWriter'].value:
1155 msg.info(
"SharedWriter: not updating athena output filename for {0}".format(dataType))
1157 self.
conf._dataDictionary[dataType].value[0] +=
"_000"
1158 msg.info(
"Updated athena output filename for {0} to {1}".format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1163 if 'mpi' in self.
conf.argdict:
1164 msg.info(
"Running in MPI mode")
1165 mpi.setupMPIConfig(output, self.
conf.dataDictionary)
1170 for dataType
in input:
1171 inputFiles[dataType] = self.
conf.dataDictionary[dataType]
1172 outputFiles = dict()
1173 for dataType
in output:
1174 outputFiles[dataType] = self.
conf.dataDictionary[dataType]
1177 nameForFiles = commonExecutorStepName(self.
_name)
1178 for dataType, dataArg
in self.
conf.dataDictionary.items():
1179 if isinstance(dataArg, list)
and dataArg:
1180 if self.
conf.totalExecutorSteps <= 1:
1181 raise ValueError(
'Multiple input arguments provided but only running one substep')
1182 if self.
conf.totalExecutorSteps != len(dataArg):
1183 raise ValueError(f
'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1185 if dataArg[self.
conf.executorStep].io ==
'input' and nameForFiles
in dataArg[self.
conf.executorStep].executor:
1186 inputFiles[dataArg[self.
conf.executorStep].subtype] = dataArg
1188 if dataArg.io ==
'input' and nameForFiles
in dataArg.executor:
1189 inputFiles[dataArg.subtype] = dataArg
1191 msg.debug(
'Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1195 output = outputFiles)
1205 dbrelease = dbsetup =
None
1206 if 'DBRelease' in self.
conf.argdict:
1207 dbrelease = self.
conf.argdict[
'DBRelease'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1208 if path.islink(dbrelease):
1209 dbrelease = path.realpath(dbrelease)
1212 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1214 msg.debug(
'DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1215 if not os.access(dbrelease, os.R_OK):
1216 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1217 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1218 dbrelease = dbdMatch.group(1)
1219 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1222 msg.debug(
'Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1223 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1226 setupDBRelease(dbsetup)
1229 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1237 super(athenaExecutor, self).
preExecute(input, output)
1242 msg.info(
'Now writing wrapper for substep executor {0}'.format(self.
_name))
1244 msg.info(
'Athena will be executed in a subshell via {0}'.format(self.
_cmd))
1250 if 'mpi' in self.
conf.argdict:
1254 if self.
conf.totalExecutorSteps > 1:
1256 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1258 if self.
conf.executorStep == self.
conf.totalExecutorSteps - 1:
1264 for i
in range(self.
conf.totalExecutorSteps):
1265 for v
in self.
conf.dataDictionary[dataType].value:
1266 newValue.append(v.replace(
'_{0}{1}_'.format(executorStepSuffix, self.
conf.executorStep),
1267 '_{0}{1}_'.format(executorStepSuffix, i)))
1269 self.
conf.dataDictionary[dataType].multipleOK =
True
1271 for i
in range(self.
conf.totalExecutorSteps):
1272 newValue.append(self.
conf.dataDictionary[dataType].originalName +
'_{0}{1}'.format(executorStepSuffix, i))
1273 self.
conf.dataDictionary[dataType].value = newValue
1276 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1281 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1283 skipFileChecks=
False
1284 if 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value:
1288 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1291 if 'TXT_JIVEXMLTGZ' in self.
conf.dataDictionary:
1302 msg.info(
'scanning {0} for reporting events passed the filter ISF_SimEventFilter'.format(self.
_logFileName))
1310 if (
'deleteIntermediateOutputfiles' in self.
conf._argdict
and self.
conf._argdict[
'deleteIntermediateOutputfiles'].value):
1311 inputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_input ])
1313 for k, v
in inputDataDictionary.items():
1314 if not v.io ==
'temporary':
1316 for filename
in v.value:
1317 if os.access(filename, os.R_OK)
and not filename.startswith(
"/cvmfs"):
1318 msg.info(
"Removing intermediate {0} input file {1}".format(k, filename))
1320 if (os.path.realpath(filename) != filename):
1321 targetpath = os.path.realpath(filename)
1323 if (targetpath)
and os.access(targetpath, os.R_OK):
1324 os.unlink(targetpath)
1330 deferredException =
None
1331 memLeakThreshold = 5000
1336 super(athenaExecutor, self).
validate()
1339 msg.error(
'Validation of return code failed: {0!s}'.format(e))
1340 deferredException = e
1350 msg.info(
'Analysing memory monitor output file {0} for possible memory leak'.format(self.
_memFullFile))
1355 msg.warning(
'Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1357 msg.warning(
'Failed to analyse the memory monitor file {0}'.format(self.
_memFullFile))
1359 msg.info(
'No memory monitor file to be analysed')
1365 if 'ignorePatterns' in self.
conf.argdict:
1366 igPat = self.
conf.argdict[
'ignorePatterns'].value
1369 if 'ignoreFiles' in self.
conf.argdict:
1377 msg.info(
'Scanning logfile {0} for errors in substep {1}'.format(self.
_logFileName, self.
_substep))
1379 ignoreList=ignorePatterns)
1380 worstError = self.
_logScan.worstError()
1381 eventLoopWarnings = self.
_logScan.eventLoopWarnings()
1387 if worstError[
'firstError']:
1388 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
1389 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
1390 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
1391 elif 'G4Exception' in worstError[
'firstError'][
'message']:
1392 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".format(worstError[
'firstError'][
'firstLine'])
1394 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
1396 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName, worstError[
'firstError'][
'message'])
1398 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".format(worstError[
'level'])
1401 if deferredException
is not None:
1403 if worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1404 deferredException.errMsg = deferredException.errMsg +
"; {0}".format(exitErrorMessage)
1407 deferredException.errMsg = deferredException.errMsg +
"; Possible memory leak: 'pss' slope: {0} KB/s".format(self.
_memLeakResult[
'slope'])
1408 raise deferredException
1412 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
1413 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1415 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']
and (
not mpi.mpiShouldValidate()):
1416 msg.warning(f
'Found {worstError["level"]} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1417 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1419 msg.error(
'Fatal error in athena logfile (level {0})'.format(worstError[
'level']))
1422 exitErrorMessage = exitErrorMessage +
"; Possible memory leak: 'pss' slope: {0} KB/s".format(self.
_memLeakResult[
'slope'])
1424 'Fatal error in athena logfile: "{0}"'.format(exitErrorMessage))
1427 if (len(eventLoopWarnings) > 0):
1428 msg.warning(
'Found WARNINGS in the event loop, as follows:')
1429 for element
in eventLoopWarnings:
1430 msg.warning(
'{0} {1} ({2} instances)'.format(element[
'item'][
'service'],element[
'item'][
'message'],element[
'count']))
1433 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
1437 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
1442 if 'CA' not in self.
conf.argdict:
1450 if self.
conf.argdict[
'CA']
is None:
1454 if self.
conf.argdict[
'CA'].returnMyValue(name=self.
name, substep=self.
substep)
is True:
1464 if 'athena' in self.
conf.argdict:
1469 currentSubstep =
None
1470 if 'athenaopts' in self.
conf.argdict:
1471 currentName = commonExecutorStepName(self.
name)
1472 if currentName
in self.
conf.argdict[
'athenaopts'].value:
1473 currentSubstep = currentName
1474 if self.
substep in self.
conf.argdict[
'athenaopts'].value:
1475 msg.info(
'Athenaopts found for {0} and {1}, joining options. '
1476 'Consider changing your configuration to use just the name or the alias of the substep.'
1478 self.
conf.argdict[
'athenaopts'].value[currentSubstep].extend(self.
conf.argdict[
'athenaopts'].value[self.
substep])
1479 del self.
conf.argdict[
'athenaopts'].value[self.
substep]
1480 msg.debug(
'Athenaopts: {0}'.format(self.
conf.argdict[
'athenaopts'].value))
1481 elif self.
substep in self.
conf.argdict[
'athenaopts'].value:
1483 elif 'all' in self.
conf.argdict[
'athenaopts'].value:
1484 currentSubstep =
'all'
1487 preLoadUpdated = dict()
1489 preLoadUpdated[currentSubstep] =
False
1490 if 'athenaopts' in self.
conf.argdict:
1491 if currentSubstep
is not None:
1492 for athArg
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]:
1495 if athArg.startswith(
'--preloadlib'):
1497 i = self.
conf.argdict[
'athenaopts'].value[currentSubstep].
index(athArg)
1498 v = athArg.split(
'=', 1)[1]
1499 msg.info(
'Updating athena --preloadlib option for substep {1} with: {0}'.format(self.
_envUpdate.value(
'LD_PRELOAD'), self.
name))
1501 self.
conf.argdict[
'athenaopts']._value[currentSubstep][i] =
'--preloadlib={0}'.format(newPreloads)
1502 except Exception
as e:
1503 msg.warning(
'Failed to interpret athena option: {0} ({1})'.format(athArg, e))
1504 preLoadUpdated[currentSubstep] =
True
1506 if not preLoadUpdated[currentSubstep]:
1507 msg.info(
'Setting athena preloadlibs for substep {1} to: {0}'.format(self.
_envUpdate.value(
'LD_PRELOAD'), self.
name))
1508 if 'athenaopts' in self.
conf.argdict:
1509 if currentSubstep
is not None:
1510 self.
conf.argdict[
'athenaopts'].value[currentSubstep].append(
"--preloadlib={0}".format(self.
_envUpdate.value(
'LD_PRELOAD')))
1512 self.
conf.argdict[
'athenaopts'].value[
'all'] = [
"--preloadlib={0}".format(self.
_envUpdate.value(
'LD_PRELOAD'))]
1517 if 'athenaopts' in self.
conf.argdict:
1518 if currentSubstep
is None and "all" in self.
conf.argdict[
'athenaopts'].value:
1519 self.
_cmd.extend(self.
conf.argdict[
'athenaopts'].value[
'all'])
1520 elif currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1521 self.
_cmd.extend(self.
conf.argdict[
'athenaopts'].value[currentSubstep])
1523 if currentSubstep
is None:
1524 currentSubstep =
'all'
1528 msg.info(
'ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1529 elif 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1530 msg.info(
'Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1531 elif 'athenaopts' in self.
conf.argdict:
1532 athenaConfigRelatedOpts = [
'--config-only',
'--drop-and-reload']
1534 if currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1535 conflictOpts =
set(athenaConfigRelatedOpts).
intersection(
set([opt.split(
'=')[0]
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]]))
1536 if len(conflictOpts) > 0:
1537 msg.info(
'Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts)))
1539 msg.info(
'Appending "--drop-and-reload" to athena options')
1540 self.
_cmd.append(
'--drop-and-reload')
1542 msg.info(
'No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.
name))
1543 self.
_cmd.append(
'--drop-and-reload')
1546 msg.info(
'Appending "--drop-and-reload" to athena options')
1547 self.
_cmd.append(
'--drop-and-reload')
1549 msg.info(
'Skipping test for "--drop-and-reload" in this executor')
1554 if not (
'athenaopts' in self.
conf.argdict
and
1555 any(
'--threads' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1560 if not (
'athenaopts' in self.
conf.argdict
and
1561 any(
'--nprocs' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1568 msg.info(
'Updated script arguments with topoptions: %s', self.
_cmd)
1588 setupATLAS =
'my_setupATLAS.sh'
1589 with open(setupATLAS,
'w')
as f:
1590 print(
"#!/bin/bash", file=f)
1592if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1593 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1595source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1597 os.chmod(setupATLAS, 0o755)
1600 'Preparing wrapper file {wrapperFileName} with '
1601 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.format(
1608 container_cmd =
None
1611 print(
'#!/bin/sh', file=wrapper)
1613 container_cmd = [ os.path.abspath(setupATLAS),
1621 print(
'echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1622 print(
'echo " '+
" ".join([
'setupATLAS'] + container_cmd[1:] + [path.join(
'.', self.
_wrapperFile)]) +
'"', file=wrapper)
1623 print(
'echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1625 print(
'echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1627 print(
'echo', file=wrapper)
1637 print(f
'source ./{setupATLAS} -q', file=wfile)
1638 print(f
'asetup {asetup}', file=wfile)
1639 print(
'if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1641 dbroot = path.dirname(dbsetup)
1642 dbversion = path.basename(dbroot)
1643 print(
"# DBRelease setup", file=wrapper)
1644 print(
'echo Setting up DBRelease {dbroot} environment'.format(dbroot = dbroot), file=wrapper)
1645 print(
'export DBRELEASE={dbversion}'.format(dbversion = dbversion), file=wrapper)
1646 print(
'export CORAL_AUTH_PATH={directory}'.format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1647 print(
'export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1648 print(
'export TNS_ADMIN={directory}'.format(directory = path.join(dbroot,
'oracle-admin')), file=wrapper)
1649 print(
'DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper)
1651 print(
"# AthenaMT explicitly disabled for this executor", file=wrapper)
1653 print(
"# AthenaMP explicitly disabled for this executor", file=wrapper)
1656 if not envSetting.startswith(
'LD_PRELOAD'):
1657 print(
"export", envSetting, file=wrapper)
1661 if 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1662 msg.info(
'Valgrind engaged')
1665 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".format(
1669 print(
' '.join(self.
_cmd),
"--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1670 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1673 if 'valgrindDefaultOpts' in self.
conf._argdict:
1674 defaultOptions = self.
conf._argdict[
'valgrindDefaultOpts'].value
1676 defaultOptions =
True
1677 if 'valgrindExtraOpts' in self.
conf._argdict:
1678 extraOptionsList = self.
conf._argdict[
'valgrindExtraOpts'].value
1680 extraOptionsList =
None
1681 msg.debug(
"requested Valgrind command basic options: {options}".format(options = defaultOptions))
1682 msg.debug(
"requested Valgrind command extra options: {options}".format(options = extraOptionsList))
1683 command = ValgrindCommand(
1684 defaultOptions = defaultOptions,
1685 extraOptionsList = extraOptionsList,
1686 AthenaSerialisedConfigurationFile = \
1687 AthenaSerialisedConfigurationFile
1689 msg.debug(
"Valgrind command: {command}".format(command = command))
1690 print(command, file=wrapper)
1694 elif 'vtune' in self.
conf._argdict
and self.
conf._argdict[
'vtune'].value
is True:
1695 msg.info(
'VTune engaged')
1698 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".format(
1702 print(
' '.join(self.
_cmd),
"--config-only={0}".format(AthenaSerialisedConfigurationFile), file=wrapper)
1703 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1706 if 'vtuneDefaultOpts' in self.
conf._argdict:
1707 defaultOptions = self.
conf._argdict[
'vtuneDefaultOpts'].value
1709 defaultOptions =
True
1710 if 'vtuneExtraOpts' in self.
conf._argdict:
1711 extraOptionsList = self.
conf._argdict[
'vtuneExtraOpts'].value
1713 extraOptionsList =
None
1719 AthenaCommand = self.
_cmd
1720 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1722 msg.debug(
"requested VTune command basic options: {options}".format(options = defaultOptions))
1723 msg.debug(
"requested VTune command extra options: {options}".format(options = extraOptionsList))
1724 command = VTuneCommand(
1725 defaultOptions = defaultOptions,
1726 extraOptionsList = extraOptionsList,
1727 AthenaCommand = AthenaCommand
1729 msg.debug(
"VTune command: {command}".format(command = command))
1730 print(command, file=wrapper)
1732 msg.info(
'Valgrind/VTune not engaged')
1734 print(
' '.join(self.
_cmd), file=wrapper)
1736 except OSError
as e:
1737 errMsg =
'error writing athena wrapper {fileName}: {error}'.format(
1743 trfExit.nameToCode(
'TRF_EXEC_SETUP_WRAPPER'),
1749 self.
_cmd = container_cmd + self.
_cmd
1756 if 'selfMerge' not in dir(fileArg):
1757 msg.info(
'Files in {0} cannot merged (no selfMerge() method is implemented)'.format(fileArg.name))
1760 if fileArg.mergeTargetSize == 0:
1761 msg.info(
'Files in {0} will not be merged as target size is set to 0'.format(fileArg.name))
1765 mergeCandidates = [list()]
1766 currentMergeSize = 0
1767 for fname
in fileArg.value:
1768 size = fileArg.getSingleMetadata(fname,
'file_size')
1769 if not isinstance(size, int):
1770 msg.warning(
'File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.format(fileArg,
type(size)))
1773 if len(mergeCandidates[-1]) == 0:
1774 msg.debug(
'Adding file {0} to current empty merge list'.format(fname))
1775 mergeCandidates[-1].append(fname)
1776 currentMergeSize += size
1779 if fileArg.mergeTargetSize < 0
or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1780 msg.debug(
'Adding file {0} to merge list {1} as it gets closer to the target size'.format(fname, mergeCandidates[-1]))
1781 mergeCandidates[-1].append(fname)
1782 currentMergeSize += size
1785 msg.debug(
'Starting a new merge list with file {0}'.format(fname))
1786 mergeCandidates.append([fname])
1787 currentMergeSize = size
1789 msg.debug(
'First pass splitting will merge files in this way: {0}'.format(mergeCandidates))
1791 if len(mergeCandidates) == 1:
1794 mergeNames = [fileArg.originalName]
1799 for mergeGroup
in mergeCandidates:
1802 mergeName = fileArg.originalName +
'_{0}'.format(counter)
1803 while path.exists(mergeName):
1805 mergeName = fileArg.originalName +
'_{0}'.format(counter)
1806 mergeNames.append(mergeName)
1809 for targetName, mergeGroup, counter
in zip(mergeNames, mergeCandidates, list(range(len(mergeNames)))):
1810 msg.info(
'Want to merge files {0} to {1}'.format(mergeGroup, targetName))
1811 if len(mergeGroup) <= 1:
1812 msg.info(
'Skip merging for single file')
1815 self.
_myMerger.append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.
conf.argdict))
1820 targetTGZName = self.
conf.dataDictionary[
'TXT_JIVEXMLTGZ'].value[0]
1821 if os.path.exists(targetTGZName):
1822 os.remove(targetTGZName)
1825 fNameRE = re.compile(
r"JiveXML\_\d+\_\d+.xml")
1828 tar = tarfile.open(targetTGZName,
"w:gz")
1829 for fName
in os.listdir(
'.'):
1830 matches = fNameRE.findall(fName)
1831 if len(matches) > 0:
1832 if fNameRE.findall(fName)[0] == fName:
1833 msg.info(
'adding %s to %s', fName, targetTGZName)
1837 msg.info(
'JiveXML compression: %s has been written and closed.', targetTGZName)
1847 super(optionalAthenaExecutor, self).
validate()
1850 msg.warning(
'Validation failed for {0}: {1}'.format(self.
_name, e))
1855 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
1870 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1871 inData = set(), outData =
set(), exe =
'athena.py', exeArgs = [
'athenaopts'], substep =
None, inputEventTest =
True,
1872 perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {},
1873 manualDataDictionary =
None, memMonitor =
True):
1875 super(POOLMergeExecutor, self).
__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1876 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1877 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1878 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1879 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1883 super(POOLMergeExecutor, self).
preExecute(input=input, output=output)
1888 super(POOLMergeExecutor, self).
execute()
1898 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
1899 if 'NTUP_PILEUP' not in output:
1901 if 'formats' not in self.
conf.argdict:
1903 'No derivation configuration specified')
1905 if (
'DAOD' not in output)
and (
'D2AOD' not in output):
1907 'No base name for DAOD output')
1910 if 'formats' in self.
conf.argdict: formatList = self.
conf.argdict[
'formats'].value
1911 for reduction
in formatList:
1912 if (
'DAOD' in output):
1913 dataType =
'DAOD_' + reduction
1914 if 'augmentations' not in self.
conf.argdict:
1915 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1917 for val
in self.
conf.argdict[
'augmentations'].value:
1918 if reduction
in val.split(
':')[0]:
1919 outputName =
'DAOD_' + val.split(
':')[1] +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1922 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1924 if (
'D2AOD' in output):
1925 dataType =
'D2AOD_' + reduction
1926 outputName =
'D2AOD_' + reduction +
'.' + self.
conf.argdict[
'outputD2AODFile'].value[0]
1928 msg.info(
'Adding reduction output type {0}'.format(dataType))
1929 output.add(dataType)
1933 self.
conf.dataDictionary[dataType] = newReduction
1937 if (
'DAOD' in output):
1938 output.remove(
'DAOD')
1939 del self.
conf.dataDictionary[
'DAOD']
1940 del self.
conf.argdict[
'outputDAODFile']
1941 if (
'D2AOD' in output):
1942 output.remove(
'D2AOD')
1943 del self.
conf.dataDictionary[
'D2AOD']
1944 del self.
conf.argdict[
'outputD2AODFile']
1946 msg.info(
'Data dictionary is now: {0}'.format(self.
conf.dataDictionary))
1947 msg.info(
'Input/Output: {0}/{1}'.format(input, output))
1949 msg.info(
'Data dictionary is now: {0}'.format(self.
conf.dataDictionary))
1950 msg.info(
'Input/Output: {0}/{1}'.format(input, output))
1951 super(reductionFrameworkExecutor, self).
preExecute(input, output)
1956 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set([
'HIST_AOD',
'HIST_ESD']), outData=
set([
'HIST']),
1957 exe=
'DQHistogramMerge.py', exeArgs = [], memMonitor =
True):
1961 super(DQMergeExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1962 exeArgs=exeArgs, memMonitor=memMonitor)
1967 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
1969 super(DQMergeExecutor, self).
preExecute(input=input, output=output)
1973 for dataType
in input:
1974 for fname
in self.
conf.dataDictionary[dataType].value:
1975 self.
conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1976 print(fname, file=DQMergeFile)
1981 if len(output) != 1:
1983 'One (and only one) output file must be given to {0} (got {1})'.format(self.
name, len(output)))
1984 outDataType = list(output)[0]
1985 self.
_cmd.append(self.
conf.dataDictionary[outDataType].value[0])
1988 if (self.
conf._argdict.get(
"run_post_processing",
False)):
1989 self.
_cmd.append(
'True')
1991 self.
_cmd.append(
'False')
1993 if (self.
conf._argdict.get(
"is_incremental_merge",
False)):
1994 self.
_cmd.append(
'True')
1996 self.
_cmd.append(
'False')
1998 for k
in (
"excludeHist",
"excludeDir"):
1999 if k
in self.
conf._argdict:
2000 self.
_cmd.append(
"--{0}={1}".format(k,self.
conf._argdict[k]))
2005 super(DQMergeExecutor, self).
validate()
2007 exitErrorMessage =
''
2011 worstError = logScan.worstError()
2015 if worstError[
'firstError']:
2016 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2017 exitErrorMessage =
"Long {0} message at line {1}" \
2018 " (see jobReport for further details)".format(worstError[
'level'],
2019 worstError[
'firstError'][
'firstLine'])
2021 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName,
2022 worstError[
'firstError'][
'message'])
2023 except OSError
as e:
2024 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2026 'Exception raised while attempting to scan logfile {0}: {1}'.format(self.
_logFileName, e))
2028 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2029 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2030 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2032 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2034 msg.error(
'Fatal error in script logfile (level {0})'.format(worstError[
'level']))
2035 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2039 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
2043 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
2047 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set([
'HIST']), outData=
set([
'HIST']),
2048 exe=
'DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor =
True):
2052 super(DQMPostProcessExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2053 exeArgs=exeArgs, memMonitor=memMonitor)
2058 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
2060 super(DQMPostProcessExecutor, self).
preExecute(input=input, output=output)
2063 dsName=self.
conf.argdict[
"inputHISTFile"].dataset
2065 for dataType
in input:
2066 for fname
in self.
conf.dataDictionary[dataType].value:
2068 if not dsName: dsName=
".".join(fname.split(
'.')[0:4])
2069 inputList.append(
"#".join([dsName,fname]))
2072 if len(output) != 1:
2074 'One (and only one) output file must be given to {0} (got {1})'.format(self.
name, len(output)))
2075 outDataType = list(output)[0]
2079 wrapperParams={
"inputHistFiles" : inputList,
2080 "outputHistFile" : dsName+
"#"+self.
conf.dataDictionary[outDataType].value[0],
2081 "incrementalMode":
"True" if self.
conf._argdict.get(
"is_incremental_merge",
False)
else "False",
2082 "postProcessing" :
"True" if self.
conf._argdict.get(
"run_post_processing",
False)
else "False",
2083 "doWebDisplay" :
"True" if self.
conf._argdict.get(
"doWebDisplay",
False)
else "False",
2084 "allowCOOLUpload":
"True" if self.
conf._argdict.get(
"allowCOOLUpload",
False)
else "False",
2088 if "servers" in self.
conf._argdict:
2089 wrapperParams[
"server"]=self.
conf._argdict[
"servers"]
2091 for k
in (
"excludeHist",
"excludeDir"):
2092 if k
in self.
conf._argdict:
2093 wrapperParams[
"mergeParams"]+=(
" --{0}={1}".format(k,self.
conf._argdict[k]))
2096 with open(
"args.json",
"w")
as f:
2097 json.dump(wrapperParams, f)
2099 self.
_cmd.append(
"--argJSON=args.json")
2105 super(DQMPostProcessExecutor, self).
validate()
2107 exitErrorMessage =
''
2111 worstError = logScan.worstError()
2115 if worstError[
'firstError']:
2116 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2117 exitErrorMessage =
"Long {0} message at line {1}" \
2118 " (see jobReport for further details)".format(worstError[
'level'],
2119 worstError[
'firstError'][
'firstLine'])
2121 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".format(self.
_logFileName,
2122 worstError[
'firstError'][
'message'])
2123 except OSError
as e:
2124 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2126 'Exception raised while attempting to scan logfile {0}: {1}'.format(self.
_logFileName, e))
2128 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2129 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2130 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2132 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2134 msg.error(
'Fatal error in script logfile (level {0})'.format(worstError[
'level']))
2135 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2139 msg.info(
'Executor {0} has validated successfully'.format(self.
name))
2143 msg.debug(
'valStop time is {0}'.format(self.
_valStop))
2149 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
2152 if self.
_exe is None:
2158 if len(output) != 1:
2160 'One (and only one) output file must be given to {0} (got {1})'.format(self.
name, len(output)))
2161 outDataType = list(output)[0]
2162 self.
_cmd.append(self.
conf.dataDictionary[outDataType].value[0])
2164 for dataType
in input:
2165 self.
_cmd.extend(self.
conf.dataDictionary[dataType].value)
2167 super(NTUPMergeExecutor, self).
preExecute(input=input, output=output)
2171 """Executor for running physvalPostProcessing.py with <input> <output> args"""
2175 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
2179 if len(input) != 1
or len(output) != 1:
2181 f
'Exactly one input and one output must be specified (got inputs={len(input)}, outputs={len(output)})')
2183 self.
_cmd.append(self.
conf.dataDictionary[list(input)[0]].value[0])
2184 self.
_cmd.append(self.
conf.dataDictionary[list(output)[0]].value[0])
2187 super(NtupPhysValPostProcessingExecutor, self).
preExecute(input=input, output=output)
2198 if 'maskEmptyInputs' in self.
conf.argdict
and self.
conf.argdict[
'maskEmptyInputs'].value
is True:
2200 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2201 nEvents = self.
conf.dataDictionary[self.
_inputBS].getSingleMetadata(fname,
'nentries')
2202 msg.debug(
'Found {0} events in file {1}'.format(nEvents, fname))
2203 if isinstance(nEvents, int)
and nEvents > 0:
2204 eventfullFiles.append(fname)
2207 msg.info(
'The following input files are masked because they have 0 events: {0}'.format(
' '.join(self.
_maskedFiles)))
2208 if len(eventfullFiles) == 0:
2209 if 'emptyStubFile' in self.
conf.argdict
and path.exists(self.
conf.argdict[
'emptyStubFile'].value):
2211 msg.info(
"All input files are empty - will use stub file {0} as output".format(self.
conf.argdict[
'emptyStubFile'].value))
2214 'All input files had zero events - aborting BS merge')
2221 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2223 print(fname, file=BSFileList)
2224 except OSError
as e:
2225 errMsg =
'Got an error when writing list of BS files to {0}: {1}'.format(self.
_mergeBSFileList, e)
2234 elif self.
conf.argdict[
'allowRename'].value
is True:
2236 msg.info(
'Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2240 errmsg =
'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2246 super(bsMergeExecutor, self).
preExecute(input=input, output=output)
2252 msg.debug(
'exeStart time is {0}'.format(self.
_exeStart))
2253 msg.info(
"Using stub file for empty BS output - execution is fake")
2260 msg.debug(
'exeStop time is {0}'.format(self.
_exeStop))
2262 super(bsMergeExecutor, self).
execute()
2272 except OSError
as e:
2287 if 'outputArchFile' not in self.
conf.argdict:
2292 with open(
'zip_wrapper.py',
'w')
as zip_wrapper:
2293 print(
"import zipfile, os, shutil", file=zip_wrapper)
2294 if os.path.exists(self.
conf.argdict[
'outputArchFile'].value[0]):
2296 print(
"zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2299 print(
"zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2300 print(
"for f in {}:".format(self.
conf.argdict[
'inputDataFile'].value), file=zip_wrapper)
2303 print(
" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2304 print(
" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2305 print(
" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2306 print(
" archive.extractall('tmp')", file=zip_wrapper)
2307 print(
" archive.close()", file=zip_wrapper)
2309 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2310 print(
" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2311 print(
" os.unlink(f)", file=zip_wrapper)
2312 print(
" if os.path.isdir('tmp'):", file=zip_wrapper)
2313 print(
" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2314 print(
" for name in files:", file=zip_wrapper)
2315 print(
" print 'Zipping {}'.format(name)", file=zip_wrapper)
2316 print(
" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2317 print(
" shutil.rmtree('tmp')", file=zip_wrapper)
2318 print(
" else:", file=zip_wrapper)
2319 print(
" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2320 print(
" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2321 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2322 print(
" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2323 print(
" os.unlink(f)", file=zip_wrapper)
2324 print(
"zf.close()", file=zip_wrapper)
2325 os.chmod(
'zip_wrapper.py', 0o755)
2326 except OSError
as e:
2327 errMsg =
'error writing zip wrapper {fileName}: {error}'.format(fileName =
'zip_wrapper.py',
2334 self.
_cmd.append(
'zip_wrapper.py')
2337 elif self.
_exe ==
'unarchive':
2339 for infile
in self.
conf.argdict[
'inputArchFile'].value:
2340 if not zipfile.is_zipfile(infile):
2342 'An input file is not a zip archive - aborting unpacking')
2343 self.
_cmd = [
'python']
2345 with open(
'unarchive_wrapper.py',
'w')
as unarchive_wrapper:
2346 print(
"import zipfile", file=unarchive_wrapper)
2347 print(
"for f in {}:".format(self.
conf.argdict[
'inputArchFile'].value), file=unarchive_wrapper)
2348 print(
" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2349 print(
" path = '{}'".format(self.
conf.argdict[
'path']), file=unarchive_wrapper)
2350 print(
" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2351 print(
" archive.extractall(path)", file=unarchive_wrapper)
2352 print(
" archive.close()", file=unarchive_wrapper)
2353 os.chmod(
'unarchive_wrapper.py', 0o755)
2354 except OSError
as e:
2355 errMsg =
'error writing unarchive wrapper {fileName}: {error}'.format(fileName =
'unarchive_wrapper.py',
2362 self.
_cmd.append(
'unarchive_wrapper.py')
2363 super(archiveExecutor, self).
preExecute(input=input, output=output)
void print(char *figname, TCanvas *c1)
Argument class for substep lists, suitable for preExec/postExec.
Class holding the update to an environment that will be passed on to an executor.
Specialist execution class for DQM post-processing of histograms.
__init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set(['HIST']), outData=set(['HIST']), exe='DQM_Tier0Wrapper_tf.py', exeArgs=[], memMonitor=True)
preExecute(self, input=set(), output=set())
Specialist execution class for merging DQ histograms.
__init__(self, name='DQHistMerge', trf=None, conf=None, inData=set(['HIST_AOD', 'HIST_ESD']), outData=set(['HIST']), exe='DQHistogramMerge.py', exeArgs=[], memMonitor=True)
preExecute(self, input=set(), output=set())
Specialist execution class for merging NTUPLE files.
preExecute(self, input=set(), output=set())
Specialist execution class for running post processing on merged PHYVAL NTUPLE file.
preExecute(self, input=set(), output=set())
preExecute(self, input=set(), output=set())
__init__(self, name='hybridPOOLMerge', trf=None, conf=None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton', inData=set(), outData=set(), exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, manualDataDictionary=None, memMonitor=True)
Initialise hybrid POOL merger athena executor.
preExecute(self, input=set(), output=set())
str _athenaMPEventOrdersFile
_writeAthenaWrapper(self, asetup=None, dbsetup=None, ossetup=None)
Write a wrapper script which runs asetup and then Athena.
_smartMerge(self, fileArg)
Manage smart merging of output files.
preExecute(self, input=set(), output=set())
__init__(self, name='athena', trf=None, conf=None, skeletonFile=None, skeletonCA=None, inData=set(), outData=set(), inputDataTypeCountCheck=None, exe='athena.py', exeArgs=['athenaopts'], substep=None, inputEventTest=True, perfMonFile=None, tryDropAndReload=True, extraRunargs={}, runtimeRunargs={}, literalRunargs=[], dataArgs=[], checkEventCount=False, errorMaskFiles=None, manualDataDictionary=None, memMonitor=True, disableMT=False, disableMP=False, onlyMP=False, onlyMT=False, onlyMPWithRunargs=None)
Initialise athena executor.
bool _athenaMPReadEventOrders
inputDataTypeCountCheck(self)
_prepAthenaCommandLine(self)
Prepare the correct command line to be used to invoke athena.
str _athenaMPWorkerTopDir
_isCAEnabled(self)
Check if running with CA.
_skeletonCA
Handle MPI setup.
_tryDropAndReload
Add –drop-and-reload if possible (and allowed!).
_envUpdate
Look for environment updates and perpare the athena command line.
Specalise the script executor to deal with the BS merge oddity of excluding empty DRAWs.
preExecute(self, input=set(), output=set())
__init__(self, name='Dummy', trf=None, conf=None, inData=set(), outData=set())
__init__(self, name='Echo', trf=None)
setFromTransform(self, trf)
Set configuration properties from the parent transform.
addToArgdict(self, key, value)
Add a new object to the argdict.
addToDataDictionary(self, key, value)
Add a new object to the dataDictionary.
__init__(self, argdict={}, dataDictionary={}, firstExecutor=False)
Configuration for an executor.
Special executor that will enable a logfile scan as part of its validation.
__init__(self, name='Logscan')
preExecute(self, input=set(), output=set())
Athena executor where failure is not consisered fatal.
Specialist executor to manage the handling of multiple implicit input and output files within the der...
preExecute(self, input=set(), output=set())
Take inputDAODFile and setup the actual outputs needed in this job.
_buildStandardCommand(self)
preExecute(self, input=set(), output=set())
__init__(self, name='Script', trf=None, conf=None, inData=set(), outData=set(), exe=None, exeArgs=None, memMonitor=True)
Logfile suitable for scanning logfiles with an athena flavour, i.e., lines of the form "SERVICE LOGL...
Small class used for vailiadating event counts between input and output files.
Class of patterns that can be ignored from athena logfiles.
std::vector< std::string > intersection(std::vector< std::string > &v1, std::vector< std::string > &v2)
bool add(const std::string &hname, TKey *tobj)
std::vector< std::string > split(const std::string &s, const std::string &t=":")