12 import os.path
as path
20 from fnmatch
import fnmatch
21 msg = logging.getLogger(__name__)
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName, executorStepSuffix
33 import PyJobTransforms.trfExceptions
as trfExceptions
36 import PyJobTransforms.trfEnv
as trfEnv
46 enc = s.encoding.lower()
47 if enc.find(
'ascii') >= 0
or enc.find(
'ansi') >= 0:
48 return open (s.fileno(),
'w', encoding=
'utf-8')
63 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
82 @dataDictionary.setter
106 @totalExecutorSteps.setter
138 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
155 if len(dataOverlap) > 0:
157 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.
format(self.
_name,
' '.
join(dataOverlap)))
167 self.
conf.setFromTransform(trf)
220 if '_substep' in dir(self):
226 if '_trf' in dir(self):
237 if '_inData' in dir(self):
247 if '_inData' in dir(self):
257 if '_outData' in dir(self):
267 if '_outData' in dir(self):
278 if '_input' in dir(self):
287 if '_output' in dir(self):
326 if hasattr(self,
'_first'):
473 msg.info(
'Preexecute for %s', self.
_name)
478 msg.info(
'Starting execution of %s', self.
_name)
482 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
487 msg.info(
'Postexecute for %s', self.
_name)
492 msg.info(
'Executor %s has no validation function - assuming all ok', self.
_name)
508 super(logscanExecutor, self).
__init__(name=name)
514 msg.info(
'Preexecute for %s', self.
_name)
515 if 'logfile' in self.
conf.argdict:
520 msg.info(
"Starting validation for {0}".
format(self.
_name))
524 if 'ignorePatterns' in self.
conf.argdict:
525 igPat = self.
conf.argdict[
'ignorePatterns'].value
528 if 'ignoreFiles' in self.
conf.argdict:
538 worstError = self.
_logScan.worstError()
542 if worstError[
'firstError']:
543 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
544 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
545 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
546 elif 'G4Exception' in worstError[
'firstError'][
'message']:
547 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
549 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".
format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
551 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".
format(self.
_logFileName, worstError[
'firstError'][
'message'])
553 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".
format(worstError[
'level'])
556 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
557 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
560 msg.error(
'Fatal error in athena logfile (level {0})'.
format(worstError[
'level']))
562 'Fatal error in athena logfile: "{0}"'.
format(exitErrorMessage))
565 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
577 super(echoExecutor, self).
__init__(name=name, trf=trf)
583 msg.info(
'Starting execution of %s', self.
_name)
584 msg.info(
'Transform argument dictionary now follows:')
585 for k, v
in self.
conf.argdict.items():
586 print(
"%s = %s" % (k, v))
590 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
596 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
599 super(dummyExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
605 msg.info(
'Starting execution of %s', self.
_name)
607 for k, v
in self.
conf.argdict.items():
609 msg.info(
'Creating dummy output file: {0}'.
format(self.
conf.argdict[k].value[0]))
610 open(self.
conf.argdict[k].value[0],
'a').close()
614 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
620 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData =
set(),
621 exe =
None, exeArgs =
None, memMonitor =
True):
628 super(scriptExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
660 msg.debug(
'scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
666 if self.
_cmd is None:
668 msg.info(
'Will execute script as %s', self.
_cmd)
675 if 'TRF_ECHO' in os.environ:
676 msg.info(
'TRF_ECHO envvar is set - enabling command echoing to stdout')
678 elif 'TRF_NOECHO' in os.environ:
679 msg.info(
'TRF_NOECHO envvar is set - disabling command echoing to stdout')
683 msg.info(
'Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
685 elif 'TZHOME' in os.environ:
686 msg.info(
'Tier-0 environment detected - enabling command echoing to stdout')
689 msg.info(
'Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.
format(self.
_name, self.
_logFileName))
696 encargs = {
'encoding' :
'utf-8'}
698 self.
_exeLogFile.setFormatter(logging.Formatter(
'%(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
703 self.
_echostream.setFormatter(logging.Formatter(
'%(name)s %(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
711 'No executor set in {0}'.
format(self.__class__.__name__))
713 if arg
in self.
conf.argdict:
717 if isinstance(self.
conf.argdict[arg].value, list):
725 msg.info(
'Starting execution of {0} ({1})'.
format(self.
_name, self.
_cmd))
729 if (
'execOnly' in self.
conf.argdict
and self.
conf.argdict[
'execOnly']
is True):
730 msg.info(
'execOnly flag is set - execution will now switch, replacing the transform')
733 encargs = {
'encoding' :
'utf8'}
739 msg.info(
"chdir /srv to launch a nested container for the substep")
741 p = subprocess.Popen(self.
_cmd, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
744 msg.info(
"chdir {} after launching the nested container".
format(self._workdir))
745 os.chdir(self._workdir)
751 memMonitorCommand = [
'prmon',
'--pid',
str(p.pid),
'--filename',
'prmon.full.' + self.
_name,
754 mem_proc = subprocess.Popen(memMonitorCommand, shell =
False, close_fds=
True, **encargs)
756 except Exception
as e:
757 msg.warning(
'Failed to spawn memory monitor for {0}: {1}'.
format(self.
_name, e))
760 while p.poll()
is None:
761 line = p.stdout.readline()
765 for line
in p.stdout:
769 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
773 errMsg =
'Execution of {0} failed and raised OSError: {1}'.
format(self.
_cmd[0], e)
779 mem_proc.send_signal(signal.SIGUSR1)
781 while (
not mem_proc.poll())
and countWait < 10:
795 except Exception
as e:
809 msg.info(
'Executor {0} validated successfully (return code {1})'.
format(self.
_name, self.
_rc))
820 if trfExit.codeToSignalname(self.
_rc) !=
"":
821 self.
_errMsg =
'{0} got a {1} signal (exit code {2})'.
format(self.
_name, trfExit.codeToSignalname(self.
_rc), self.
_rc)
823 self.
_errMsg =
'Non-zero return code from %s (%d)' % (self.
_name, self.
_rc)
828 if 'checkEventCount' in self.
conf.argdict
and self.
conf.argdict[
'checkEventCount'].returnMyValue(exe=self)
is False:
829 msg.info(
'Event counting for substep {0} is skipped'.
format(self.
name))
831 if 'mpi' in self.
conf.argdict
and not mpi.mpiShouldValidate():
832 msg.info(
'MPI mode -- skipping output event count check')
837 msg.info(
'Event counting for substep {0} passed'.
format(self.
name))
845 _exitMessageLimit = 200
846 _defaultIgnorePatternFile = [
'atlas_error_mask.db']
885 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
886 inData = set(), outData =
set(), inputDataTypeCountCheck =
None, exe =
'athena.py', exeArgs = [
'athenaopts'],
887 substep =
None, inputEventTest =
True, perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {}, runtimeRunargs = {},
888 literalRunargs = [], dataArgs = [], checkEventCount =
False, errorMaskFiles =
None,
889 manualDataDictionary =
None, memMonitor =
True, disableMT =
False, disableMP =
False, onlyMP =
False, onlyMT =
False, onlyMPWithRunargs =
None):
909 msg.debug(
"Resource monitoring from PerfMon is now deprecated")
912 if isinstance(skeletonFile, str):
917 super(athenaExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918 exeArgs=exeArgs, memMonitor=memMonitor)
925 self.
_jobOptionsTemplate = JobOptionsTemplate(exe = self, version =
'$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
933 @inputDataTypeCountCheck.setter
975 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
983 if self.
conf.dataDictionary[dataType].nentries ==
'UNDEFINED':
986 thisInputEvents = self.
conf.dataDictionary[dataType].nentries
987 if thisInputEvents > inputEvents:
988 inputEvents = thisInputEvents
992 if (
'skipEvents' in self.
conf.argdict
and
993 self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
994 mySkipEvents = self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
998 if (
'maxEvents' in self.
conf.argdict
and
999 self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
1000 myMaxEvents = self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1005 if (self.
_inputEventTest and mySkipEvents > 0
and mySkipEvents >= inputEvents):
1007 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(mySkipEvents, inputEvents, dt))
1011 if (myMaxEvents != -1):
1013 expectedEvents = myMaxEvents
1015 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1017 expectedEvents = inputEvents-mySkipEvents
1020 msg.info(
'input event count is UNDEFINED, setting expectedEvents to 0')
1026 OSSetupString =
None
1030 legacyThreadingRelease =
False
1031 if 'asetup' in self.
conf.argdict:
1032 asetupString = self.
conf.argdict[
'asetup'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1037 if asetupString
is not None:
1039 currentOS = os.environ[
'ALRB_USER_PLATFORM']
1040 if legacyOSRelease
and "centos7" not in currentOS:
1041 OSSetupString =
"centos7"
1042 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self.
_substep, OSSetupString))
1046 if 'runInContainer' in self.
conf.argdict:
1047 OSSetupString = self.
conf.argdict[
'runInContainer'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1048 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self.
_substep, OSSetupString))
1049 if OSSetupString
is not None and asetupString
is None:
1051 '--asetup must be used for the substep which requires --runInContainer')
1056 if k
in self.
conf._argdict:
1060 if (((
'multithreaded' in self.
conf._argdict
and self.
conf._argdict[
'multithreaded'].value)
or (
'multiprocess' in self.
conf._argdict
and self.
conf._argdict[
'multiprocess'].value))
and
1061 (
'ATHENA_CORE_NUMBER' not in os.environ)):
1063 msg.warning(
'either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1075 msg.info(
"This configuration does not support MT, falling back to MP")
1083 msg.info(
"This configuration does not support MP, using MT")
1092 msg.info(
"Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".
format(expectedEvents, self.
_athenaMP))
1097 if self.
conf.totalExecutorSteps > 1:
1098 for dataType
in output:
1099 if self.
conf._dataDictionary[dataType].originalName:
1100 self.
conf._dataDictionary[dataType].value[0] = self.
conf._dataDictionary[dataType].originalName
1102 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1103 self.
conf._dataDictionary[dataType].value[0] +=
"_{0}{1}".
format(executorStepSuffix, self.
conf.executorStep)
1104 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1111 if 'athenaMPUseEventOrders' in self.
conf.argdict
and self.
conf._argdict[
'athenaMPUseEventOrders'].value
is True:
1116 if (
'athenaMPStrategy' in self.
conf.argdict
and
1117 (self.
conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None)):
1123 if 'athenaMPMergeTargetSize' in self.
conf.argdict:
1124 for dataType
in output:
1125 if dataType
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1126 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[dataType] * 1000000
1127 msg.info(
'Set target merge size for {0} to {1}'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1130 matchedViaGlob =
False
1131 for mtsType, mtsSize
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value.items():
1132 if fnmatch(dataType, mtsType):
1133 self.
conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1134 msg.info(
'Set target merge size for {0} to {1} from "{2}" glob'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135 matchedViaGlob =
True
1137 if not matchedViaGlob
and "ALL" in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1138 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[
"ALL"] * 1000000
1139 msg.info(
'Set target merge size for {0} to {1} from "ALL" value'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1144 for dataType
in output:
1145 if self.
conf.totalExecutorSteps <= 1:
1146 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1147 if 'eventService' not in self.
conf.argdict
or 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value
is False:
1148 if 'sharedWriter' in self.
conf.argdict
and self.
conf.argdict[
'sharedWriter'].value:
1149 msg.info(
"SharedWriter: not updating athena output filename for {0}".
format(dataType))
1151 self.
conf._dataDictionary[dataType].value[0] +=
"_000"
1152 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1157 if 'mpi' in self.
conf.argdict:
1158 msg.info(
"Running in MPI mode")
1159 mpi.setupMPIConfig(output, self.
conf.dataDictionary)
1164 for dataType
in input:
1165 inputFiles[dataType] = self.
conf.dataDictionary[dataType]
1166 outputFiles = dict()
1167 for dataType
in output:
1168 outputFiles[dataType] = self.
conf.dataDictionary[dataType]
1172 for dataType, dataArg
in self.
conf.dataDictionary.items():
1173 if isinstance(dataArg, list)
and dataArg:
1174 if self.
conf.totalExecutorSteps <= 1:
1175 raise ValueError(
'Multiple input arguments provided but only running one substep')
1176 if self.
conf.totalExecutorSteps != len(dataArg):
1177 raise ValueError(f
'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1179 if dataArg[self.
conf.executorStep].io ==
'input' and nameForFiles
in dataArg[self.
conf.executorStep].executor:
1180 inputFiles[dataArg[self.
conf.executorStep].subtype] = dataArg
1182 if dataArg.io ==
'input' and nameForFiles
in dataArg.executor:
1183 inputFiles[dataArg.subtype] = dataArg
1185 msg.debug(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
1189 output = outputFiles)
1199 dbrelease = dbsetup =
None
1200 if 'DBRelease' in self.
conf.argdict:
1201 dbrelease = self.
conf.argdict[
'DBRelease'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1202 if path.islink(dbrelease):
1203 dbrelease = path.realpath(dbrelease)
1206 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1208 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
1209 if not os.access(dbrelease, os.R_OK):
1210 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
1211 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
1212 dbrelease = dbdMatch.group(1)
1216 msg.debug(
'Setting up {0} from {1}'.
format(dbdMatch.group(1), dbrelease))
1217 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1231 super(athenaExecutor, self).
preExecute(input, output)
1236 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self.
_name))
1238 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self.
_cmd))
1244 if 'mpi' in self.
conf.argdict:
1248 if self.
conf.totalExecutorSteps > 1:
1250 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1252 if self.
conf.executorStep == self.
conf.totalExecutorSteps - 1:
1258 for i
in range(self.
conf.totalExecutorSteps):
1259 for v
in self.
conf.dataDictionary[dataType].value:
1260 newValue.append(v.replace(
'_{0}{1}_'.
format(executorStepSuffix, self.
conf.executorStep),
1261 '_{0}{1}_'.
format(executorStepSuffix, i)))
1263 self.
conf.dataDictionary[dataType].multipleOK =
True
1265 for i
in range(self.
conf.totalExecutorSteps):
1266 newValue.append(self.
conf.dataDictionary[dataType].originalName +
'_{0}{1}'.
format(executorStepSuffix, i))
1267 self.
conf.dataDictionary[dataType].value = newValue
1270 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1275 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1277 skipFileChecks=
False
1278 if 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value:
1282 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1285 if 'TXT_JIVEXMLTGZ' in self.
conf.dataDictionary:
1296 msg.info(
'scanning {0} for reporting events passed the filter ISF_SimEventFilter'.
format(self.
_logFileName))
1304 if (
'deleteIntermediateOutputfiles' in self.
conf._argdict
and self.
conf._argdict[
'deleteIntermediateOutputfiles'].value):
1305 inputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_input ])
1307 for k, v
in inputDataDictionary.items():
1308 if not v.io ==
'temporary':
1310 for filename
in v.value:
1311 if os.access(filename, os.R_OK)
and not filename.startswith(
"/cvmfs"):
1312 msg.info(
"Removing intermediate {0} input file {1}".
format(k, filename))
1314 if (os.path.realpath(filename) != filename):
1315 targetpath = os.path.realpath(filename)
1317 if (targetpath)
and os.access(targetpath, os.R_OK):
1318 os.unlink(targetpath)
1324 deferredException =
None
1325 memLeakThreshold = 5000
1330 super(athenaExecutor, self).
validate()
1333 msg.error(
'Validation of return code failed: {0!s}'.
format(e))
1334 deferredException = e
1344 msg.info(
'Analysing memory monitor output file {0} for possible memory leak'.
format(self.
_memFullFile))
1349 msg.warning(
'Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1351 msg.warning(
'Failed to analyse the memory monitor file {0}'.
format(self.
_memFullFile))
1353 msg.info(
'No memory monitor file to be analysed')
1359 if 'ignorePatterns' in self.
conf.argdict:
1360 igPat = self.
conf.argdict[
'ignorePatterns'].value
1363 if 'ignoreFiles' in self.
conf.argdict:
1373 ignoreList=ignorePatterns)
1374 worstError = self.
_logScan.worstError()
1375 eventLoopWarnings = self.
_logScan.eventLoopWarnings()
1381 if worstError[
'firstError']:
1382 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
1383 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
1384 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
1385 elif 'G4Exception' in worstError[
'firstError'][
'message']:
1386 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
1388 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".
format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
1390 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".
format(self.
_logFileName, worstError[
'firstError'][
'message'])
1392 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".
format(worstError[
'level'])
1395 if deferredException
is not None:
1397 if worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1398 deferredException.errMsg = deferredException.errMsg +
"; {0}".
format(exitErrorMessage)
1401 deferredException.errMsg = deferredException.errMsg +
"; Possible memory leak: 'pss' slope: {0} KB/s".
format(self.
_memLeakResult[
'slope'])
1402 raise deferredException
1406 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
1407 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1409 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']
and (
not mpi.mpiShouldValidate()):
1410 msg.warning(f
'Found {worstError['level
']} in the logfile in MPI rank {mpi.getMPIRank()} but moving on to be failure-tolerant')
1411 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1413 msg.error(
'Fatal error in athena logfile (level {0})'.
format(worstError[
'level']))
1416 exitErrorMessage = exitErrorMessage +
"; Possible memory leak: 'pss' slope: {0} KB/s".
format(self.
_memLeakResult[
'slope'])
1418 'Fatal error in athena logfile: "{0}"'.
format(exitErrorMessage))
1421 if (len(eventLoopWarnings) > 0):
1422 msg.warning(
'Found WARNINGS in the event loop, as follows:')
1423 for element
in eventLoopWarnings:
1424 msg.warning(
'{0} {1} ({2} instances)'.
format(element[
'item'][
'service'],element[
'item'][
'message'],element[
'count']))
1427 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
1436 if 'CA' not in self.
conf.argdict:
1444 if self.
conf.argdict[
'CA']
is None:
1448 if self.
conf.argdict[
'CA'].returnMyValue(name=self.
name, substep=self.
substep)
is True:
1458 if 'athena' in self.
conf.argdict:
1463 currentSubstep =
None
1464 if 'athenaopts' in self.
conf.argdict:
1466 if currentName
in self.
conf.argdict[
'athenaopts'].value:
1467 currentSubstep = currentName
1468 if self.
substep in self.
conf.argdict[
'athenaopts'].value:
1469 msg.info(
'Athenaopts found for {0} and {1}, joining options. '
1470 'Consider changing your configuration to use just the name or the alias of the substep.'
1472 self.
conf.argdict[
'athenaopts'].value[currentSubstep].
extend(self.
conf.argdict[
'athenaopts'].value[self.
substep])
1473 del self.
conf.argdict[
'athenaopts'].value[self.
substep]
1474 msg.debug(
'Athenaopts: {0}'.
format(self.
conf.argdict[
'athenaopts'].value))
1475 elif self.
substep in self.
conf.argdict[
'athenaopts'].value:
1477 elif 'all' in self.
conf.argdict[
'athenaopts'].value:
1478 currentSubstep =
'all'
1481 preLoadUpdated = dict()
1483 preLoadUpdated[currentSubstep] =
False
1484 if 'athenaopts' in self.
conf.argdict:
1485 if currentSubstep
is not None:
1486 for athArg
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]:
1489 if athArg.startswith(
'--preloadlib'):
1491 i = self.
conf.argdict[
'athenaopts'].value[currentSubstep].
index(athArg)
1492 v = athArg.split(
'=', 1)[1]
1493 msg.info(
'Updating athena --preloadlib option for substep {1} with: {0}'.
format(self.
_envUpdate.
value(
'LD_PRELOAD'), self.
name))
1495 self.
conf.argdict[
'athenaopts']._value[currentSubstep][i] =
'--preloadlib={0}'.
format(newPreloads)
1496 except Exception
as e:
1497 msg.warning(
'Failed to interpret athena option: {0} ({1})'.
format(athArg, e))
1498 preLoadUpdated[currentSubstep] =
True
1500 if not preLoadUpdated[currentSubstep]:
1502 if 'athenaopts' in self.
conf.argdict:
1503 if currentSubstep
is not None:
1511 if 'athenaopts' in self.
conf.argdict:
1512 if currentSubstep
is None and "all" in self.
conf.argdict[
'athenaopts'].value:
1514 elif currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1515 self.
_cmd.
extend(self.
conf.argdict[
'athenaopts'].value[currentSubstep])
1517 if currentSubstep
is None:
1518 currentSubstep =
'all'
1522 msg.info(
'ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1523 elif 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1524 msg.info(
'Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1525 elif 'athenaopts' in self.
conf.argdict:
1526 athenaConfigRelatedOpts = [
'--config-only',
'--drop-and-reload']
1528 if currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1529 conflictOpts =
set(athenaConfigRelatedOpts).
intersection(
set([opt.split(
'=')[0]
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]]))
1530 if len(conflictOpts) > 0:
1531 msg.info(
'Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.
format(
list(conflictOpts)))
1533 msg.info(
'Appending "--drop-and-reload" to athena options')
1536 msg.info(
'No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.
format(self.
name))
1540 msg.info(
'Appending "--drop-and-reload" to athena options')
1543 msg.info(
'Skipping test for "--drop-and-reload" in this executor')
1548 if not (
'athenaopts' in self.
conf.argdict
and
1549 any(
'--threads' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1554 if not (
'athenaopts' in self.
conf.argdict
and
1555 any(
'--nprocs' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1562 msg.info(
'Updated script arguments with topoptions: %s', self.
_cmd)
1582 setupATLAS =
'my_setupATLAS.sh'
1583 with open(setupATLAS,
'w')
as f:
1584 print(
"#!/bin/bash", file=f)
1586 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1587 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1589 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1591 os.chmod(setupATLAS, 0o755)
1594 'Preparing wrapper file {wrapperFileName} with '
1595 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.
format(
1602 container_cmd =
None
1605 print(
'#!/bin/sh', file=wrapper)
1607 container_cmd = [ os.path.abspath(setupATLAS),
1615 print(
'echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1616 print(
'echo " '+
" ".
join([
'setupATLAS'] + container_cmd[1:] + [path.join(
'.', self.
_wrapperFile)]) +
'"', file=wrapper)
1617 print(
'echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1619 print(
'echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1621 print(
'echo', file=wrapper)
1631 print(f
'source ./{setupATLAS} -q', file=wfile)
1632 print(f
'asetup {asetup}', file=wfile)
1633 print(
'if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1635 dbroot = path.dirname(dbsetup)
1636 dbversion = path.basename(dbroot)
1637 print(
"# DBRelease setup", file=wrapper)
1638 print(
'echo Setting up DBRelease {dbroot} environment'.
format(dbroot = dbroot), file=wrapper)
1639 print(
'export DBRELEASE={dbversion}'.
format(dbversion = dbversion), file=wrapper)
1640 print(
'export CORAL_AUTH_PATH={directory}'.
format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1641 print(
'export CORAL_DBLOOKUP_PATH={directory}'.
format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1642 print(
'export TNS_ADMIN={directory}'.
format(directory = path.join(dbroot,
'oracle-admin')), file=wrapper)
1643 print(
'DATAPATH={dbroot}:$DATAPATH'.
format(dbroot = dbroot), file=wrapper)
1645 print(
"# AthenaMT explicitly disabled for this executor", file=wrapper)
1647 print(
"# AthenaMP explicitly disabled for this executor", file=wrapper)
1650 if not envSetting.startswith(
'LD_PRELOAD'):
1651 print(
"export", envSetting, file=wrapper)
1655 if 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1656 msg.info(
'Valgrind engaged')
1659 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".
format(
1663 print(
' '.
join(self.
_cmd),
"--config-only={0}".
format(AthenaSerialisedConfigurationFile), file=wrapper)
1664 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1667 if 'valgrindDefaultOpts' in self.
conf._argdict:
1668 defaultOptions = self.
conf._argdict[
'valgrindDefaultOpts'].value
1670 defaultOptions =
True
1671 if 'valgrindExtraOpts' in self.
conf._argdict:
1672 extraOptionsList = self.
conf._argdict[
'valgrindExtraOpts'].value
1674 extraOptionsList =
None
1675 msg.debug(
"requested Valgrind command basic options: {options}".
format(options = defaultOptions))
1676 msg.debug(
"requested Valgrind command extra options: {options}".
format(options = extraOptionsList))
1678 defaultOptions = defaultOptions,
1679 extraOptionsList = extraOptionsList,
1680 AthenaSerialisedConfigurationFile = \
1681 AthenaSerialisedConfigurationFile
1683 msg.debug(
"Valgrind command: {command}".
format(command = command))
1684 print(command, file=wrapper)
1688 elif 'vtune' in self.
conf._argdict
and self.
conf._argdict[
'vtune'].value
is True:
1689 msg.info(
'VTune engaged')
1692 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".
format(
1696 print(
' '.
join(self.
_cmd),
"--config-only={0}".
format(AthenaSerialisedConfigurationFile), file=wrapper)
1697 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1700 if 'vtuneDefaultOpts' in self.
conf._argdict:
1701 defaultOptions = self.
conf._argdict[
'vtuneDefaultOpts'].value
1703 defaultOptions =
True
1704 if 'vtuneExtraOpts' in self.
conf._argdict:
1705 extraOptionsList = self.
conf._argdict[
'vtuneExtraOpts'].value
1707 extraOptionsList =
None
1713 AthenaCommand = self.
_cmd
1714 AthenaCommand.append(AthenaSerialisedConfigurationFile)
1716 msg.debug(
"requested VTune command basic options: {options}".
format(options = defaultOptions))
1717 msg.debug(
"requested VTune command extra options: {options}".
format(options = extraOptionsList))
1719 defaultOptions = defaultOptions,
1720 extraOptionsList = extraOptionsList,
1721 AthenaCommand = AthenaCommand
1723 msg.debug(
"VTune command: {command}".
format(command = command))
1724 print(command, file=wrapper)
1726 msg.info(
'Valgrind/VTune not engaged')
1730 except OSError
as e:
1731 errMsg =
'error writing athena wrapper {fileName}: {error}'.
format(
1737 trfExit.nameToCode(
'TRF_EXEC_SETUP_WRAPPER'),
1743 self.
_cmd = container_cmd + self.
_cmd
1750 if 'selfMerge' not in dir(fileArg):
1751 msg.info(
'Files in {0} cannot merged (no selfMerge() method is implemented)'.
format(fileArg.name))
1754 if fileArg.mergeTargetSize == 0:
1755 msg.info(
'Files in {0} will not be merged as target size is set to 0'.
format(fileArg.name))
1759 mergeCandidates = [
list()]
1760 currentMergeSize = 0
1761 for fname
in fileArg.value:
1762 size = fileArg.getSingleMetadata(fname,
'file_size')
1763 if not isinstance(size, int):
1764 msg.warning(
'File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.
format(fileArg,
type(size)))
1767 if len(mergeCandidates[-1]) == 0:
1768 msg.debug(
'Adding file {0} to current empty merge list'.
format(fname))
1769 mergeCandidates[-1].
append(fname)
1770 currentMergeSize += size
1773 if fileArg.mergeTargetSize < 0
or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1774 msg.debug(
'Adding file {0} to merge list {1} as it gets closer to the target size'.
format(fname, mergeCandidates[-1]))
1775 mergeCandidates[-1].
append(fname)
1776 currentMergeSize += size
1779 msg.debug(
'Starting a new merge list with file {0}'.
format(fname))
1780 mergeCandidates.append([fname])
1781 currentMergeSize = size
1783 msg.debug(
'First pass splitting will merge files in this way: {0}'.
format(mergeCandidates))
1785 if len(mergeCandidates) == 1:
1788 mergeNames = [fileArg.originalName]
1793 for mergeGroup
in mergeCandidates:
1796 mergeName = fileArg.originalName +
'_{0}'.
format(counter)
1797 while path.exists(mergeName):
1799 mergeName = fileArg.originalName +
'_{0}'.
format(counter)
1800 mergeNames.append(mergeName)
1803 for targetName, mergeGroup, counter
in zip(mergeNames, mergeCandidates,
list(
range(len(mergeNames)))):
1804 msg.info(
'Want to merge files {0} to {1}'.
format(mergeGroup, targetName))
1805 if len(mergeGroup) <= 1:
1806 msg.info(
'Skip merging for single file')
1809 self.
_myMerger.
append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.
conf.argdict))
1814 targetTGZName = self.
conf.dataDictionary[
'TXT_JIVEXMLTGZ'].value[0]
1815 if os.path.exists(targetTGZName):
1816 os.remove(targetTGZName)
1819 fNameRE = re.compile(
r"JiveXML\_\d+\_\d+.xml")
1822 tar = tarfile.open(targetTGZName,
"w:gz")
1823 for fName
in os.listdir(
'.'):
1824 matches = fNameRE.findall(fName)
1825 if len(matches) > 0:
1826 if fNameRE.findall(fName)[0] == fName:
1827 msg.info(
'adding %s to %s', fName, targetTGZName)
1831 msg.info(
'JiveXML compression: %s has been written and closed.', targetTGZName)
1841 super(optionalAthenaExecutor, self).
validate()
1844 msg.warning(
'Validation failed for {0}: {1}'.
format(self.
_name, e))
1864 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1865 inData = set(), outData =
set(), exe =
'athena.py', exeArgs = [
'athenaopts'], substep =
None, inputEventTest =
True,
1866 perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {},
1867 manualDataDictionary =
None, memMonitor =
True):
1869 super(POOLMergeExecutor, self).
__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1870 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1871 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1872 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1873 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1877 super(POOLMergeExecutor, self).
preExecute(input=input, output=output)
1882 super(POOLMergeExecutor, self).
execute()
1892 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
1893 if 'NTUP_PILEUP' not in output:
1895 if 'formats' not in self.
conf.argdict:
1897 'No derivation configuration specified')
1899 if (
'DAOD' not in output)
and (
'D2AOD' not in output):
1901 'No base name for DAOD output')
1904 if 'formats' in self.
conf.argdict: formatList = self.
conf.argdict[
'formats'].value
1905 for reduction
in formatList:
1906 if (
'DAOD' in output):
1907 dataType =
'DAOD_' + reduction
1908 if 'augmentations' not in self.
conf.argdict:
1909 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1911 for val
in self.
conf.argdict[
'augmentations'].value:
1912 if reduction
in val.split(
':')[0]:
1913 outputName =
'DAOD_' + val.split(
':')[1] +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1916 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1918 if (
'D2AOD' in output):
1919 dataType =
'D2AOD_' + reduction
1920 outputName =
'D2AOD_' + reduction +
'.' + self.
conf.argdict[
'outputD2AODFile'].value[0]
1922 msg.info(
'Adding reduction output type {0}'.
format(dataType))
1923 output.add(dataType)
1927 self.
conf.dataDictionary[dataType] = newReduction
1931 if (
'DAOD' in output):
1932 output.remove(
'DAOD')
1933 del self.
conf.dataDictionary[
'DAOD']
1934 del self.
conf.argdict[
'outputDAODFile']
1935 if (
'D2AOD' in output):
1936 output.remove(
'D2AOD')
1937 del self.
conf.dataDictionary[
'D2AOD']
1938 del self.
conf.argdict[
'outputD2AODFile']
1940 msg.info(
'Data dictionary is now: {0}'.
format(self.
conf.dataDictionary))
1941 msg.info(
'Input/Output: {0}/{1}'.
format(input, output))
1943 msg.info(
'Data dictionary is now: {0}'.
format(self.
conf.dataDictionary))
1944 msg.info(
'Input/Output: {0}/{1}'.
format(input, output))
1945 super(reductionFrameworkExecutor, self).
preExecute(input, output)
1950 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set([
'HIST_AOD',
'HIST_ESD']), outData=
set([
'HIST']),
1951 exe=
'DQHistogramMerge.py', exeArgs = [], memMonitor =
True):
1955 super(DQMergeExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1956 exeArgs=exeArgs, memMonitor=memMonitor)
1961 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
1963 super(DQMergeExecutor, self).
preExecute(input=input, output=output)
1967 for dataType
in input:
1968 for fname
in self.
conf.dataDictionary[dataType].value:
1969 self.
conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1970 print(fname, file=DQMergeFile)
1975 if len(output) != 1:
1977 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
1978 outDataType =
list(output)[0]
1982 if (self.
conf._argdict.get(
"run_post_processing",
False)):
1987 if (self.
conf._argdict.get(
"is_incremental_merge",
False)):
1992 for k
in (
"excludeHist",
"excludeDir"):
1993 if k
in self.
conf._argdict:
1999 super(DQMergeExecutor, self).
validate()
2001 exitErrorMessage =
''
2005 worstError = logScan.worstError()
2009 if worstError[
'firstError']:
2010 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2011 exitErrorMessage =
"Long {0} message at line {1}" \
2012 " (see jobReport for further details)".
format(worstError[
'level'],
2013 worstError[
'firstError'][
'firstLine'])
2016 worstError[
'firstError'][
'message'])
2017 except OSError
as e:
2018 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2020 'Exception raised while attempting to scan logfile {0}: {1}'.
format(self.
_logFileName, e))
2022 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2023 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2024 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2026 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2028 msg.error(
'Fatal error in script logfile (level {0})'.
format(worstError[
'level']))
2029 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2033 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
2041 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set([
'HIST']), outData=
set([
'HIST']),
2042 exe=
'DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor =
True):
2046 super(DQMPostProcessExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2047 exeArgs=exeArgs, memMonitor=memMonitor)
2052 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
2054 super(DQMPostProcessExecutor, self).
preExecute(input=input, output=output)
2057 dsName=self.
conf.argdict[
"inputHISTFile"].dataset
2059 for dataType
in input:
2060 for fname
in self.
conf.dataDictionary[dataType].value:
2062 if not dsName: dsName=
".".
join(fname.split(
'.')[0:4])
2063 inputList.append(
"#".
join([dsName,fname]))
2066 if len(output) != 1:
2068 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
2069 outDataType =
list(output)[0]
2073 wrapperParams={
"inputHistFiles" : inputList,
2074 "outputHistFile" : dsName+
"#"+self.
conf.dataDictionary[outDataType].value[0],
2075 "incrementalMode":
"True" if self.
conf._argdict.get(
"is_incremental_merge",
False)
else "False",
2076 "postProcessing" :
"True" if self.
conf._argdict.get(
"run_post_processing",
False)
else "False",
2077 "doWebDisplay" :
"True" if self.
conf._argdict.get(
"doWebDisplay",
False)
else "False",
2078 "allowCOOLUpload":
"True" if self.
conf._argdict.get(
"allowCOOLUpload",
False)
else "False",
2082 if "servers" in self.
conf._argdict:
2083 wrapperParams[
"server"]=self.
conf._argdict[
"servers"]
2085 for k
in (
"excludeHist",
"excludeDir"):
2086 if k
in self.
conf._argdict:
2087 wrapperParams[
"mergeParams"]+=(
" --{0}={1}".
format(k,self.
conf._argdict[k]))
2090 with open(
"args.json",
"w")
as f:
2091 json.dump(wrapperParams, f)
2099 super(DQMPostProcessExecutor, self).
validate()
2101 exitErrorMessage =
''
2105 worstError = logScan.worstError()
2109 if worstError[
'firstError']:
2110 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2111 exitErrorMessage =
"Long {0} message at line {1}" \
2112 " (see jobReport for further details)".
format(worstError[
'level'],
2113 worstError[
'firstError'][
'firstLine'])
2116 worstError[
'firstError'][
'message'])
2117 except OSError
as e:
2118 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2120 'Exception raised while attempting to scan logfile {0}: {1}'.
format(self.
_logFileName, e))
2122 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2123 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2124 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2126 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2128 msg.error(
'Fatal error in script logfile (level {0})'.
format(worstError[
'level']))
2129 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2133 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
2143 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
2146 if self.
_exe is None:
2152 if len(output) != 1:
2154 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
2155 outDataType =
list(output)[0]
2158 for dataType
in input:
2161 super(NTUPMergeExecutor, self).
preExecute(input=input, output=output)
2165 """Executor for running physvalPostProcessing.py with <input> <output> args"""
2169 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
2173 if len(input) != 1
or len(output) != 1:
2175 f
'Exactly one input and one output must be specified (got inputs={len(input)}, outputs={len(output)})')
2181 super(NtupPhysValPostProcessingExecutor, self).
preExecute(input=input, output=output)
2192 if 'maskEmptyInputs' in self.
conf.argdict
and self.
conf.argdict[
'maskEmptyInputs'].value
is True:
2194 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2195 nEvents = self.
conf.dataDictionary[self.
_inputBS].getSingleMetadata(fname,
'nentries')
2196 msg.debug(
'Found {0} events in file {1}'.
format(nEvents, fname))
2197 if isinstance(nEvents, int)
and nEvents > 0:
2198 eventfullFiles.append(fname)
2201 msg.info(
'The following input files are masked because they have 0 events: {0}'.
format(
' '.
join(self.
_maskedFiles)))
2202 if len(eventfullFiles) == 0:
2203 if 'emptyStubFile' in self.
conf.argdict
and path.exists(self.
conf.argdict[
'emptyStubFile'].value):
2205 msg.info(
"All input files are empty - will use stub file {0} as output".
format(self.
conf.argdict[
'emptyStubFile'].value))
2208 'All input files had zero events - aborting BS merge')
2215 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2217 print(fname, file=BSFileList)
2218 except OSError
as e:
2228 elif self.
conf.argdict[
'allowRename'].value
is True:
2230 msg.info(
'Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2234 errmsg =
'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2240 super(bsMergeExecutor, self).
preExecute(input=input, output=output)
2247 msg.info(
"Using stub file for empty BS output - execution is fake")
2256 super(bsMergeExecutor, self).
execute()
2266 except OSError
as e:
2281 if 'outputArchFile' not in self.
conf.argdict:
2286 with open(
'zip_wrapper.py',
'w')
as zip_wrapper:
2287 print(
"import zipfile, os, shutil", file=zip_wrapper)
2288 if os.path.exists(self.
conf.argdict[
'outputArchFile'].value[0]):
2290 print(
"zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".
format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2293 print(
"zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".
format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2294 print(
"for f in {}:".
format(self.
conf.argdict[
'inputDataFile'].value), file=zip_wrapper)
2297 print(
" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2298 print(
" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2299 print(
" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2300 print(
" archive.extractall('tmp')", file=zip_wrapper)
2301 print(
" archive.close()", file=zip_wrapper)
2303 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2304 print(
" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2305 print(
" os.unlink(f)", file=zip_wrapper)
2306 print(
" if os.path.isdir('tmp'):", file=zip_wrapper)
2307 print(
" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2308 print(
" for name in files:", file=zip_wrapper)
2309 print(
" print 'Zipping {}'.format(name)", file=zip_wrapper)
2310 print(
" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2311 print(
" shutil.rmtree('tmp')", file=zip_wrapper)
2312 print(
" else:", file=zip_wrapper)
2313 print(
" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2314 print(
" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2315 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2316 print(
" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2317 print(
" os.unlink(f)", file=zip_wrapper)
2318 print(
"zf.close()", file=zip_wrapper)
2319 os.chmod(
'zip_wrapper.py', 0o755)
2320 except OSError
as e:
2321 errMsg =
'error writing zip wrapper {fileName}: {error}'.
format(fileName =
'zip_wrapper.py',
2331 elif self.
_exe ==
'unarchive':
2333 for infile
in self.
conf.argdict[
'inputArchFile'].value:
2334 if not zipfile.is_zipfile(infile):
2336 'An input file is not a zip archive - aborting unpacking')
2337 self.
_cmd = [
'python']
2339 with open(
'unarchive_wrapper.py',
'w')
as unarchive_wrapper:
2340 print(
"import zipfile", file=unarchive_wrapper)
2341 print(
"for f in {}:".
format(self.
conf.argdict[
'inputArchFile'].value), file=unarchive_wrapper)
2342 print(
" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2343 print(
" path = '{}'".
format(self.
conf.argdict[
'path']), file=unarchive_wrapper)
2344 print(
" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2345 print(
" archive.extractall(path)", file=unarchive_wrapper)
2346 print(
" archive.close()", file=unarchive_wrapper)
2347 os.chmod(
'unarchive_wrapper.py', 0o755)
2348 except OSError
as e:
2349 errMsg =
'error writing unarchive wrapper {fileName}: {error}'.
format(fileName =
'unarchive_wrapper.py',
2357 super(archiveExecutor, self).
preExecute(input=input, output=output)