12 import os.path
as path
20 from fnmatch
import fnmatch
21 msg = logging.getLogger(__name__)
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName, executorStepSuffix
33 import PyJobTransforms.trfExceptions
as trfExceptions
36 import PyJobTransforms.trfEnv
as trfEnv
46 enc = s.encoding.lower()
47 if enc.find(
'ascii') >= 0
or enc.find(
'ansi') >= 0:
48 return open (s.fileno(),
'w', encoding=
'utf-8')
63 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
82 @dataDictionary.setter
106 @totalExecutorSteps.setter
138 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
155 if len(dataOverlap) > 0:
157 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.
format(self.
_name,
' '.
join(dataOverlap)))
167 self.
conf.setFromTransform(trf)
220 if '_substep' in dir(self):
226 if '_trf' in dir(self):
237 if '_inData' in dir(self):
247 if '_inData' in dir(self):
257 if '_outData' in dir(self):
267 if '_outData' in dir(self):
278 if '_input' in dir(self):
287 if '_output' in dir(self):
326 if hasattr(self,
'_first'):
473 msg.info(
'Preexecute for %s', self.
_name)
478 msg.info(
'Starting execution of %s', self.
_name)
482 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
487 msg.info(
'Postexecute for %s', self.
_name)
492 msg.info(
'Executor %s has no validation function - assuming all ok', self.
_name)
508 super(logscanExecutor, self).
__init__(name=name)
514 msg.info(
'Preexecute for %s', self.
_name)
515 if 'logfile' in self.
conf.argdict:
520 msg.info(
"Starting validation for {0}".
format(self.
_name))
524 if 'ignorePatterns' in self.
conf.argdict:
525 igPat = self.
conf.argdict[
'ignorePatterns'].value
528 if 'ignoreFiles' in self.
conf.argdict:
538 worstError = self.
_logScan.worstError()
542 if worstError[
'firstError']:
543 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
544 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
545 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
546 elif 'G4Exception' in worstError[
'firstError'][
'message']:
547 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
549 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".
format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
551 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".
format(self.
_logFileName, worstError[
'firstError'][
'message'])
553 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".
format(worstError[
'level'])
556 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
557 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
558 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
560 msg.error(
'Fatal error in athena logfile (level {0})'.
format(worstError[
'level']))
562 'Fatal error in athena logfile: "{0}"'.
format(exitErrorMessage))
565 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
577 super(echoExecutor, self).
__init__(name=name, trf=trf)
583 msg.info(
'Starting execution of %s', self.
_name)
584 msg.info(
'Transform argument dictionary now follows:')
585 for k, v
in self.
conf.argdict.items():
586 print(
"%s = %s" % (k, v))
590 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
596 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
599 super(dummyExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
605 msg.info(
'Starting execution of %s', self.
_name)
607 for k, v
in self.
conf.argdict.items():
609 msg.info(
'Creating dummy output file: {0}'.
format(self.
conf.argdict[k].value[0]))
610 open(self.
conf.argdict[k].value[0],
'a').close()
614 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
620 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData =
set(),
621 exe =
None, exeArgs =
None, memMonitor =
True):
628 super(scriptExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
660 msg.debug(
'scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
666 if self.
_cmd is None:
668 msg.info(
'Will execute script as %s', self.
_cmd)
675 if 'TRF_ECHO' in os.environ:
676 msg.info(
'TRF_ECHO envvar is set - enabling command echoing to stdout')
678 elif 'TRF_NOECHO' in os.environ:
679 msg.info(
'TRF_NOECHO envvar is set - disabling command echoing to stdout')
683 msg.info(
'Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
685 elif 'TZHOME' in os.environ:
686 msg.info(
'Tier-0 environment detected - enabling command echoing to stdout')
689 msg.info(
'Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.
format(self.
_name, self.
_logFileName))
696 encargs = {
'encoding' :
'utf-8'}
698 self.
_exeLogFile.setFormatter(logging.Formatter(
'%(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
703 self.
_echostream.setFormatter(logging.Formatter(
'%(name)s %(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
711 'No executor set in {0}'.
format(self.__class__.__name__))
713 if arg
in self.
conf.argdict:
717 if isinstance(self.
conf.argdict[arg].value, list):
725 msg.info(
'Starting execution of {0} ({1})'.
format(self.
_name, self.
_cmd))
729 if (
'execOnly' in self.
conf.argdict
and self.
conf.argdict[
'execOnly']
is True):
730 msg.info(
'execOnly flag is set - execution will now switch, replacing the transform')
733 encargs = {
'encoding' :
'utf8'}
739 msg.info(
"chdir /srv to launch a nested container for the substep")
741 p = subprocess.Popen(self.
_cmd, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
744 msg.info(
"chdir {} after launching the nested container".
format(self._workdir))
745 os.chdir(self._workdir)
751 memMonitorCommand = [
'prmon',
'--pid',
str(p.pid),
'--filename',
'prmon.full.' + self.
_name,
754 mem_proc = subprocess.Popen(memMonitorCommand, shell =
False, close_fds=
True, **encargs)
756 except Exception
as e:
757 msg.warning(
'Failed to spawn memory monitor for {0}: {1}'.
format(self.
_name, e))
760 while p.poll()
is None:
761 line = p.stdout.readline()
765 for line
in p.stdout:
769 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
773 errMsg =
'Execution of {0} failed and raised OSError: {1}'.
format(self.
_cmd[0], e)
779 mem_proc.send_signal(signal.SIGUSR1)
781 while (
not mem_proc.poll())
and countWait < 10:
795 except Exception
as e:
809 msg.info(
'Executor {0} validated successfully (return code {1})'.
format(self.
_name, self.
_rc))
820 if trfExit.codeToSignalname(self.
_rc) !=
"":
821 self.
_errMsg =
'{0} got a {1} signal (exit code {2})'.
format(self.
_name, trfExit.codeToSignalname(self.
_rc), self.
_rc)
823 self.
_errMsg =
'Non-zero return code from %s (%d)' % (self.
_name, self.
_rc)
828 if 'checkEventCount' in self.
conf.argdict
and self.
conf.argdict[
'checkEventCount'].returnMyValue(exe=self)
is False:
829 msg.info(
'Event counting for substep {0} is skipped'.
format(self.
name))
831 if 'mpi' in self.
conf.argdict
and not mpi.mpiShouldValidate():
832 msg.info(
'MPI mode -- skipping output event count check')
837 msg.info(
'Event counting for substep {0} passed'.
format(self.
name))
845 _exitMessageLimit = 200
846 _defaultIgnorePatternFile = [
'atlas_error_mask.db']
885 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
886 inData = set(), outData =
set(), inputDataTypeCountCheck =
None, exe =
'athena.py', exeArgs = [
'athenaopts'],
887 substep =
None, inputEventTest =
True, perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {}, runtimeRunargs = {},
888 literalRunargs = [], dataArgs = [], checkEventCount =
False, errorMaskFiles =
None,
889 manualDataDictionary =
None, memMonitor =
True, disableMT =
False, disableMP =
False, onlyMP =
False, onlyMT =
False, onlyMPWithRunargs =
None):
909 msg.debug(
"Resource monitoring from PerfMon is now deprecated")
912 if isinstance(skeletonFile, str):
917 super(athenaExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
918 exeArgs=exeArgs, memMonitor=memMonitor)
925 self.
_jobOptionsTemplate = JobOptionsTemplate(exe = self, version =
'$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
933 @inputDataTypeCountCheck.setter
975 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
983 if self.
conf.dataDictionary[dataType].nentries ==
'UNDEFINED':
986 thisInputEvents = self.
conf.dataDictionary[dataType].nentries
987 if thisInputEvents > inputEvents:
988 inputEvents = thisInputEvents
992 if (
'skipEvents' in self.
conf.argdict
and
993 self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
994 mySkipEvents = self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
998 if (
'maxEvents' in self.
conf.argdict
and
999 self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
1000 myMaxEvents = self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1005 if (self.
_inputEventTest and mySkipEvents > 0
and mySkipEvents >= inputEvents):
1007 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(mySkipEvents, inputEvents, dt))
1011 if (myMaxEvents != -1):
1013 expectedEvents = myMaxEvents
1015 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1017 expectedEvents = inputEvents-mySkipEvents
1020 msg.info(
'input event count is UNDEFINED, setting expectedEvents to 0')
1026 OSSetupString =
None
1030 legacyThreadingRelease =
False
1031 if 'asetup' in self.
conf.argdict:
1032 asetupString = self.
conf.argdict[
'asetup'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1037 if asetupString
is not None:
1039 currentOS = os.environ[
'ALRB_USER_PLATFORM']
1040 if legacyOSRelease
and "centos7" not in currentOS:
1041 OSSetupString =
"centos7"
1042 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self.
_substep, OSSetupString))
1046 if 'runInContainer' in self.
conf.argdict:
1047 OSSetupString = self.
conf.argdict[
'runInContainer'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1048 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self.
_substep, OSSetupString))
1049 if OSSetupString
is not None and asetupString
is None:
1051 '--asetup must be used for the substep which requires --runInContainer')
1056 if k
in self.
conf._argdict:
1060 if (((
'multithreaded' in self.
conf._argdict
and self.
conf._argdict[
'multithreaded'].value)
or (
'multiprocess' in self.
conf._argdict
and self.
conf._argdict[
'multiprocess'].value))
and
1061 (
'ATHENA_CORE_NUMBER' not in os.environ)):
1063 msg.warning(
'either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1075 msg.info(
"This configuration does not support MT, falling back to MP")
1083 msg.info(
"This configuration does not support MP, using MT")
1092 msg.info(
"Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".
format(expectedEvents, self.
_athenaMP))
1097 if self.
conf.totalExecutorSteps > 1:
1098 for dataType
in output:
1099 if self.
conf._dataDictionary[dataType].originalName:
1100 self.
conf._dataDictionary[dataType].value[0] = self.
conf._dataDictionary[dataType].originalName
1102 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1103 self.
conf._dataDictionary[dataType].value[0] +=
"_{0}{1}".
format(executorStepSuffix, self.
conf.executorStep)
1104 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1111 if 'athenaMPUseEventOrders' in self.
conf.argdict
and self.
conf._argdict[
'athenaMPUseEventOrders'].value
is True:
1116 if (
'athenaMPStrategy' in self.
conf.argdict
and
1117 (self.
conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None)):
1123 if 'athenaMPMergeTargetSize' in self.
conf.argdict:
1124 for dataType
in output:
1125 if dataType
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1126 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[dataType] * 1000000
1127 msg.info(
'Set target merge size for {0} to {1}'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1130 matchedViaGlob =
False
1131 for mtsType, mtsSize
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value.items():
1132 if fnmatch(dataType, mtsType):
1133 self.
conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1134 msg.info(
'Set target merge size for {0} to {1} from "{2}" glob'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135 matchedViaGlob =
True
1137 if not matchedViaGlob
and "ALL" in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1138 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[
"ALL"] * 1000000
1139 msg.info(
'Set target merge size for {0} to {1} from "ALL" value'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1144 for dataType
in output:
1145 if self.
conf.totalExecutorSteps <= 1:
1146 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1147 if 'eventService' not in self.
conf.argdict
or 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value
is False:
1148 if 'sharedWriter' in self.
conf.argdict
and self.
conf.argdict[
'sharedWriter'].value:
1149 msg.info(
"SharedWriter: not updating athena output filename for {0}".
format(dataType))
1151 self.
conf._dataDictionary[dataType].value[0] +=
"_000"
1152 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1157 if 'mpi' in self.
conf.argdict:
1158 msg.info(
"Running in MPI mode")
1159 mpi.setupMPIConfig(output, self.
conf.dataDictionary)
1164 for dataType
in input:
1165 inputFiles[dataType] = self.
conf.dataDictionary[dataType]
1166 outputFiles = dict()
1167 for dataType
in output:
1168 outputFiles[dataType] = self.
conf.dataDictionary[dataType]
1172 for dataType, dataArg
in self.
conf.dataDictionary.items():
1173 if isinstance(dataArg, list)
and dataArg:
1174 if self.
conf.totalExecutorSteps <= 1:
1175 raise ValueError(
'Multiple input arguments provided but only running one substep')
1176 if self.
conf.totalExecutorSteps != len(dataArg):
1177 raise ValueError(f
'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1179 if dataArg[self.
conf.executorStep].io ==
'input' and nameForFiles
in dataArg[self.
conf.executorStep].executor:
1180 inputFiles[dataArg[self.
conf.executorStep].subtype] = dataArg
1182 if dataArg.io ==
'input' and nameForFiles
in dataArg.executor:
1183 inputFiles[dataArg.subtype] = dataArg
1185 msg.debug(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
1189 output = outputFiles)
1199 dbrelease = dbsetup =
None
1200 if 'DBRelease' in self.
conf.argdict:
1201 dbrelease = self.
conf.argdict[
'DBRelease'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1202 if path.islink(dbrelease):
1203 dbrelease = path.realpath(dbrelease)
1206 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1208 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
1209 if not os.access(dbrelease, os.R_OK):
1210 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
1211 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
1212 dbrelease = dbdMatch.group(1)
1216 msg.debug(
'Setting up {0} from {1}'.
format(dbdMatch.group(1), dbrelease))
1217 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1231 super(athenaExecutor, self).
preExecute(input, output)
1236 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self.
_name))
1238 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self.
_cmd))
1244 if 'mpi' in self.
conf.argdict:
1248 if self.
conf.totalExecutorSteps > 1:
1250 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1252 if self.
conf.executorStep == self.
conf.totalExecutorSteps - 1:
1258 for i
in range(self.
conf.totalExecutorSteps):
1259 for v
in self.
conf.dataDictionary[dataType].value:
1260 newValue.append(v.replace(
'_{0}{1}_'.
format(executorStepSuffix, self.
conf.executorStep),
1261 '_{0}{1}_'.
format(executorStepSuffix, i)))
1263 self.
conf.dataDictionary[dataType].multipleOK =
True
1265 for i
in range(self.
conf.totalExecutorSteps):
1266 newValue.append(self.
conf.dataDictionary[dataType].originalName +
'_{0}{1}'.
format(executorStepSuffix, i))
1267 self.
conf.dataDictionary[dataType].value = newValue
1270 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1275 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1277 skipFileChecks=
False
1278 if 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value:
1282 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1285 if 'TXT_JIVEXMLTGZ' in self.
conf.dataDictionary:
1296 msg.info(
'scanning {0} for reporting events passed the filter ISF_SimEventFilter'.
format(self.
_logFileName))
1304 if (
'deleteIntermediateOutputfiles' in self.
conf._argdict
and self.
conf._argdict[
'deleteIntermediateOutputfiles'].value):
1305 inputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_input ])
1307 for k, v
in inputDataDictionary.items():
1308 if not v.io ==
'temporary':
1310 for filename
in v.value:
1311 if os.access(filename, os.R_OK)
and not filename.startswith(
"/cvmfs"):
1312 msg.info(
"Removing intermediate {0} input file {1}".
format(k, filename))
1314 if (os.path.realpath(filename) != filename):
1315 targetpath = os.path.realpath(filename)
1317 if (targetpath)
and os.access(targetpath, os.R_OK):
1318 os.unlink(targetpath)
1324 deferredException =
None
1325 memLeakThreshold = 5000
1330 super(athenaExecutor, self).
validate()
1333 msg.error(
'Validation of return code failed: {0!s}'.
format(e))
1334 deferredException = e
1344 msg.info(
'Analysing memory monitor output file {0} for possible memory leak'.
format(self.
_memFullFile))
1349 msg.warning(
'Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1351 msg.warning(
'Failed to analyse the memory monitor file {0}'.
format(self.
_memFullFile))
1353 msg.info(
'No memory monitor file to be analysed')
1359 if 'ignorePatterns' in self.
conf.argdict:
1360 igPat = self.
conf.argdict[
'ignorePatterns'].value
1363 if 'ignoreFiles' in self.
conf.argdict:
1373 ignoreList=ignorePatterns)
1374 worstError = self.
_logScan.worstError()
1375 eventLoopWarnings = self.
_logScan.eventLoopWarnings()
1381 if worstError[
'firstError']:
1382 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
1383 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
1384 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
1385 elif 'G4Exception' in worstError[
'firstError'][
'message']:
1386 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
1388 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".
format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
1390 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".
format(self.
_logFileName, worstError[
'firstError'][
'message'])
1392 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".
format(worstError[
'level'])
1395 if deferredException
is not None:
1397 if worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1398 deferredException.errMsg = deferredException.errMsg +
"; {0}".
format(exitErrorMessage)
1401 deferredException.errMsg = deferredException.errMsg +
"; Possible memory leak: 'pss' slope: {0} KB/s".
format(self.
_memLeakResult[
'slope'])
1402 raise deferredException
1406 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
1407 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1408 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1410 msg.error(
'Fatal error in athena logfile (level {0})'.
format(worstError[
'level']))
1413 exitErrorMessage = exitErrorMessage +
"; Possible memory leak: 'pss' slope: {0} KB/s".
format(self.
_memLeakResult[
'slope'])
1415 'Fatal error in athena logfile: "{0}"'.
format(exitErrorMessage))
1418 if (len(eventLoopWarnings) > 0):
1419 msg.warning(
'Found WARNINGS in the event loop, as follows:')
1420 for element
in eventLoopWarnings:
1421 msg.warning(
'{0} {1} ({2} instances)'.
format(element[
'item'][
'service'],element[
'item'][
'message'],element[
'count']))
1424 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
1433 if 'CA' not in self.
conf.argdict:
1441 if self.
conf.argdict[
'CA']
is None:
1445 if self.
conf.argdict[
'CA'].returnMyValue(name=self.
name, substep=self.
substep)
is True:
1455 if 'athena' in self.
conf.argdict:
1460 currentSubstep =
None
1461 if 'athenaopts' in self.
conf.argdict:
1463 if currentName
in self.
conf.argdict[
'athenaopts'].value:
1464 currentSubstep = currentName
1465 if self.
substep in self.
conf.argdict[
'athenaopts'].value:
1466 msg.info(
'Athenaopts found for {0} and {1}, joining options. '
1467 'Consider changing your configuration to use just the name or the alias of the substep.'
1469 self.
conf.argdict[
'athenaopts'].value[currentSubstep].
extend(self.
conf.argdict[
'athenaopts'].value[self.
substep])
1470 del self.
conf.argdict[
'athenaopts'].value[self.
substep]
1471 msg.debug(
'Athenaopts: {0}'.
format(self.
conf.argdict[
'athenaopts'].value))
1472 elif self.
substep in self.
conf.argdict[
'athenaopts'].value:
1474 elif 'all' in self.
conf.argdict[
'athenaopts'].value:
1475 currentSubstep =
'all'
1478 preLoadUpdated = dict()
1480 preLoadUpdated[currentSubstep] =
False
1481 if 'athenaopts' in self.
conf.argdict:
1482 if currentSubstep
is not None:
1483 for athArg
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]:
1486 if athArg.startswith(
'--preloadlib'):
1488 i = self.
conf.argdict[
'athenaopts'].value[currentSubstep].
index(athArg)
1489 v = athArg.split(
'=', 1)[1]
1490 msg.info(
'Updating athena --preloadlib option for substep {1} with: {0}'.
format(self.
_envUpdate.
value(
'LD_PRELOAD'), self.
name))
1492 self.
conf.argdict[
'athenaopts']._value[currentSubstep][i] =
'--preloadlib={0}'.
format(newPreloads)
1493 except Exception
as e:
1494 msg.warning(
'Failed to interpret athena option: {0} ({1})'.
format(athArg, e))
1495 preLoadUpdated[currentSubstep] =
True
1497 if not preLoadUpdated[currentSubstep]:
1499 if 'athenaopts' in self.
conf.argdict:
1500 if currentSubstep
is not None:
1508 if 'athenaopts' in self.
conf.argdict:
1509 if currentSubstep
is None and "all" in self.
conf.argdict[
'athenaopts'].value:
1511 elif currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1512 self.
_cmd.
extend(self.
conf.argdict[
'athenaopts'].value[currentSubstep])
1514 if currentSubstep
is None:
1515 currentSubstep =
'all'
1519 msg.info(
'ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1520 elif 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1521 msg.info(
'Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1522 elif 'athenaopts' in self.
conf.argdict:
1523 athenaConfigRelatedOpts = [
'--config-only',
'--drop-and-reload']
1525 if currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1526 conflictOpts =
set(athenaConfigRelatedOpts).
intersection(
set([opt.split(
'=')[0]
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]]))
1527 if len(conflictOpts) > 0:
1528 msg.info(
'Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.
format(
list(conflictOpts)))
1530 msg.info(
'Appending "--drop-and-reload" to athena options')
1533 msg.info(
'No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.
format(self.
name))
1537 msg.info(
'Appending "--drop-and-reload" to athena options')
1540 msg.info(
'Skipping test for "--drop-and-reload" in this executor')
1545 if not (
'athenaopts' in self.
conf.argdict
and
1546 any(
'--threads' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1551 if not (
'athenaopts' in self.
conf.argdict
and
1552 any(
'--nprocs' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1558 msg.info(
'Updated script arguments with topoptions: %s', self.
_cmd)
1578 setupATLAS =
'my_setupATLAS.sh'
1579 with open(setupATLAS,
'w')
as f:
1580 print(
"#!/bin/bash", file=f)
1582 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1583 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1585 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1587 os.chmod(setupATLAS, 0o755)
1590 'Preparing wrapper file {wrapperFileName} with '
1591 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.
format(
1598 container_cmd =
None
1601 print(
'#!/bin/sh', file=wrapper)
1603 container_cmd = [ os.path.abspath(setupATLAS),
1611 print(
'echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1612 print(
'echo " '+
" ".
join([
'setupATLAS'] + container_cmd[1:] + [path.join(
'.', self.
_wrapperFile)]) +
'"', file=wrapper)
1613 print(
'echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1615 print(
'echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1617 print(
'echo', file=wrapper)
1627 print(f
'source ./{setupATLAS} -q', file=wfile)
1628 print(f
'asetup {asetup}', file=wfile)
1629 print(
'if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1631 dbroot = path.dirname(dbsetup)
1632 dbversion = path.basename(dbroot)
1633 print(
"# DBRelease setup", file=wrapper)
1634 print(
'echo Setting up DBRelease {dbroot} environment'.
format(dbroot = dbroot), file=wrapper)
1635 print(
'export DBRELEASE={dbversion}'.
format(dbversion = dbversion), file=wrapper)
1636 print(
'export CORAL_AUTH_PATH={directory}'.
format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1637 print(
'export CORAL_DBLOOKUP_PATH={directory}'.
format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1638 print(
'export TNS_ADMIN={directory}'.
format(directory = path.join(dbroot,
'oracle-admin')), file=wrapper)
1639 print(
'DATAPATH={dbroot}:$DATAPATH'.
format(dbroot = dbroot), file=wrapper)
1641 print(
"# AthenaMT explicitly disabled for this executor", file=wrapper)
1643 print(
"# AthenaMP explicitly disabled for this executor", file=wrapper)
1646 if not envSetting.startswith(
'LD_PRELOAD'):
1647 print(
"export", envSetting, file=wrapper)
1651 if 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1652 msg.info(
'Valgrind engaged')
1655 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".
format(
1659 print(
' '.
join(self.
_cmd),
"--config-only={0}".
format(AthenaSerialisedConfigurationFile), file=wrapper)
1660 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1663 if 'valgrindDefaultOpts' in self.
conf._argdict:
1664 defaultOptions = self.
conf._argdict[
'valgrindDefaultOpts'].value
1666 defaultOptions =
True
1667 if 'valgrindExtraOpts' in self.
conf._argdict:
1668 extraOptionsList = self.
conf._argdict[
'valgrindExtraOpts'].value
1670 extraOptionsList =
None
1671 msg.debug(
"requested Valgrind command basic options: {options}".
format(options = defaultOptions))
1672 msg.debug(
"requested Valgrind command extra options: {options}".
format(options = extraOptionsList))
1674 defaultOptions = defaultOptions,
1675 extraOptionsList = extraOptionsList,
1676 AthenaSerialisedConfigurationFile = \
1677 AthenaSerialisedConfigurationFile
1679 msg.debug(
"Valgrind command: {command}".
format(command = command))
1680 print(command, file=wrapper)
1684 elif 'vtune' in self.
conf._argdict
and self.
conf._argdict[
'vtune'].value
is True:
1685 msg.info(
'VTune engaged')
1688 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".
format(
1692 print(
' '.
join(self.
_cmd),
"--config-only={0}".
format(AthenaSerialisedConfigurationFile), file=wrapper)
1693 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1696 if 'vtuneDefaultOpts' in self.
conf._argdict:
1697 defaultOptions = self.
conf._argdict[
'vtuneDefaultOpts'].value
1699 defaultOptions =
True
1700 if 'vtuneExtraOpts' in self.
conf._argdict:
1701 extraOptionsList = self.
conf._argdict[
'vtuneExtraOpts'].value
1703 extraOptionsList =
None
1704 msg.debug(
"requested VTune command basic options: {options}".
format(options = defaultOptions))
1705 msg.debug(
"requested VTune command extra options: {options}".
format(options = extraOptionsList))
1707 defaultOptions = defaultOptions,
1708 extraOptionsList = extraOptionsList,
1709 AthenaSerialisedConfigurationFile = \
1710 AthenaSerialisedConfigurationFile
1712 msg.debug(
"VTune command: {command}".
format(command = command))
1713 print(command, file=wrapper)
1715 msg.info(
'Valgrind/VTune not engaged')
1719 except OSError
as e:
1720 errMsg =
'error writing athena wrapper {fileName}: {error}'.
format(
1726 trfExit.nameToCode(
'TRF_EXEC_SETUP_WRAPPER'),
1732 self.
_cmd = container_cmd + self.
_cmd
1739 if 'selfMerge' not in dir(fileArg):
1740 msg.info(
'Files in {0} cannot merged (no selfMerge() method is implemented)'.
format(fileArg.name))
1743 if fileArg.mergeTargetSize == 0:
1744 msg.info(
'Files in {0} will not be merged as target size is set to 0'.
format(fileArg.name))
1748 mergeCandidates = [
list()]
1749 currentMergeSize = 0
1750 for fname
in fileArg.value:
1751 size = fileArg.getSingleMetadata(fname,
'file_size')
1752 if not isinstance(size, int):
1753 msg.warning(
'File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.
format(fileArg,
type(size)))
1756 if len(mergeCandidates[-1]) == 0:
1757 msg.debug(
'Adding file {0} to current empty merge list'.
format(fname))
1758 mergeCandidates[-1].
append(fname)
1759 currentMergeSize += size
1762 if fileArg.mergeTargetSize < 0
or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1763 msg.debug(
'Adding file {0} to merge list {1} as it gets closer to the target size'.
format(fname, mergeCandidates[-1]))
1764 mergeCandidates[-1].
append(fname)
1765 currentMergeSize += size
1768 msg.debug(
'Starting a new merge list with file {0}'.
format(fname))
1769 mergeCandidates.append([fname])
1770 currentMergeSize = size
1772 msg.debug(
'First pass splitting will merge files in this way: {0}'.
format(mergeCandidates))
1774 if len(mergeCandidates) == 1:
1777 mergeNames = [fileArg.originalName]
1782 for mergeGroup
in mergeCandidates:
1785 mergeName = fileArg.originalName +
'_{0}'.
format(counter)
1786 while path.exists(mergeName):
1788 mergeName = fileArg.originalName +
'_{0}'.
format(counter)
1789 mergeNames.append(mergeName)
1792 for targetName, mergeGroup, counter
in zip(mergeNames, mergeCandidates,
list(
range(len(mergeNames)))):
1793 msg.info(
'Want to merge files {0} to {1}'.
format(mergeGroup, targetName))
1794 if len(mergeGroup) <= 1:
1795 msg.info(
'Skip merging for single file')
1798 self.
_myMerger.
append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.
conf.argdict))
1803 targetTGZName = self.
conf.dataDictionary[
'TXT_JIVEXMLTGZ'].value[0]
1804 if os.path.exists(targetTGZName):
1805 os.remove(targetTGZName)
1808 fNameRE = re.compile(
r"JiveXML\_\d+\_\d+.xml")
1811 tar = tarfile.open(targetTGZName,
"w:gz")
1812 for fName
in os.listdir(
'.'):
1813 matches = fNameRE.findall(fName)
1814 if len(matches) > 0:
1815 if fNameRE.findall(fName)[0] == fName:
1816 msg.info(
'adding %s to %s', fName, targetTGZName)
1820 msg.info(
'JiveXML compression: %s has been written and closed.', targetTGZName)
1830 super(optionalAthenaExecutor, self).
validate()
1833 msg.warning(
'Validation failed for {0}: {1}'.
format(self.
_name, e))
1853 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1854 inData = set(), outData =
set(), exe =
'athena.py', exeArgs = [
'athenaopts'], substep =
None, inputEventTest =
True,
1855 perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {},
1856 manualDataDictionary =
None, memMonitor =
True):
1858 super(POOLMergeExecutor, self).
__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1859 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1860 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1861 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1862 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1866 super(POOLMergeExecutor, self).
preExecute(input=input, output=output)
1871 super(POOLMergeExecutor, self).
execute()
1881 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
1882 if 'NTUP_PILEUP' not in output:
1884 if 'formats' not in self.
conf.argdict:
1886 'No derivation configuration specified')
1888 if (
'DAOD' not in output)
and (
'D2AOD' not in output):
1890 'No base name for DAOD output')
1893 if 'formats' in self.
conf.argdict: formatList = self.
conf.argdict[
'formats'].value
1894 for reduction
in formatList:
1895 if (
'DAOD' in output):
1896 dataType =
'DAOD_' + reduction
1897 if 'augmentations' not in self.
conf.argdict:
1898 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1900 for val
in self.
conf.argdict[
'augmentations'].value:
1901 if reduction
in val.split(
':')[0]:
1902 outputName =
'DAOD_' + val.split(
':')[1] +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1905 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1907 if (
'D2AOD' in output):
1908 dataType =
'D2AOD_' + reduction
1909 outputName =
'D2AOD_' + reduction +
'.' + self.
conf.argdict[
'outputD2AODFile'].value[0]
1911 msg.info(
'Adding reduction output type {0}'.
format(dataType))
1912 output.add(dataType)
1916 self.
conf.dataDictionary[dataType] = newReduction
1920 if (
'DAOD' in output):
1921 output.remove(
'DAOD')
1922 del self.
conf.dataDictionary[
'DAOD']
1923 del self.
conf.argdict[
'outputDAODFile']
1924 if (
'D2AOD' in output):
1925 output.remove(
'D2AOD')
1926 del self.
conf.dataDictionary[
'D2AOD']
1927 del self.
conf.argdict[
'outputD2AODFile']
1929 msg.info(
'Data dictionary is now: {0}'.
format(self.
conf.dataDictionary))
1930 msg.info(
'Input/Output: {0}/{1}'.
format(input, output))
1932 msg.info(
'Data dictionary is now: {0}'.
format(self.
conf.dataDictionary))
1933 msg.info(
'Input/Output: {0}/{1}'.
format(input, output))
1934 super(reductionFrameworkExecutor, self).
preExecute(input, output)
1939 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set([
'HIST_AOD',
'HIST_ESD']), outData=
set([
'HIST']),
1940 exe=
'DQHistogramMerge.py', exeArgs = [], memMonitor =
True):
1944 super(DQMergeExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1945 exeArgs=exeArgs, memMonitor=memMonitor)
1950 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
1952 super(DQMergeExecutor, self).
preExecute(input=input, output=output)
1956 for dataType
in input:
1957 for fname
in self.
conf.dataDictionary[dataType].value:
1958 self.
conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1959 print(fname, file=DQMergeFile)
1964 if len(output) != 1:
1966 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
1967 outDataType =
list(output)[0]
1971 if (self.
conf._argdict.get(
"run_post_processing",
False)):
1976 if (self.
conf._argdict.get(
"is_incremental_merge",
False)):
1981 for k
in (
"excludeHist",
"excludeDir"):
1982 if k
in self.
conf._argdict:
1988 super(DQMergeExecutor, self).
validate()
1990 exitErrorMessage =
''
1994 worstError = logScan.worstError()
1998 if worstError[
'firstError']:
1999 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2000 exitErrorMessage =
"Long {0} message at line {1}" \
2001 " (see jobReport for further details)".
format(worstError[
'level'],
2002 worstError[
'firstError'][
'firstLine'])
2005 worstError[
'firstError'][
'message'])
2006 except OSError
as e:
2007 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2009 'Exception raised while attempting to scan logfile {0}: {1}'.
format(self.
_logFileName, e))
2011 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2012 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2013 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2015 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2017 msg.error(
'Fatal error in script logfile (level {0})'.
format(worstError[
'level']))
2018 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2022 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
2030 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set([
'HIST']), outData=
set([
'HIST']),
2031 exe=
'DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor =
True):
2035 super(DQMPostProcessExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2036 exeArgs=exeArgs, memMonitor=memMonitor)
2041 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
2043 super(DQMPostProcessExecutor, self).
preExecute(input=input, output=output)
2046 dsName=self.
conf.argdict[
"inputHISTFile"].dataset
2048 for dataType
in input:
2049 for fname
in self.
conf.dataDictionary[dataType].value:
2051 if not dsName: dsName=
".".
join(fname.split(
'.')[0:4])
2052 inputList.append(
"#".
join([dsName,fname]))
2055 if len(output) != 1:
2057 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
2058 outDataType =
list(output)[0]
2062 wrapperParams={
"inputHistFiles" : inputList,
2063 "outputHistFile" : dsName+
"#"+self.
conf.dataDictionary[outDataType].value[0],
2064 "incrementalMode":
"True" if self.
conf._argdict.get(
"is_incremental_merge",
False)
else "False",
2065 "postProcessing" :
"True" if self.
conf._argdict.get(
"run_post_processing",
False)
else "False",
2066 "doWebDisplay" :
"True" if self.
conf._argdict.get(
"doWebDisplay",
False)
else "False",
2067 "allowCOOLUpload":
"True" if self.
conf._argdict.get(
"allowCOOLUpload",
False)
else "False",
2071 if "servers" in self.
conf._argdict:
2072 wrapperParams[
"server"]=self.
conf._argdict[
"servers"]
2074 for k
in (
"excludeHist",
"excludeDir"):
2075 if k
in self.
conf._argdict:
2076 wrapperParams[
"mergeParams"]+=(
" --{0}={1}".
format(k,self.
conf._argdict[k]))
2079 with open(
"args.json",
"w")
as f:
2080 json.dump(wrapperParams, f)
2088 super(DQMPostProcessExecutor, self).
validate()
2090 exitErrorMessage =
''
2094 worstError = logScan.worstError()
2098 if worstError[
'firstError']:
2099 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2100 exitErrorMessage =
"Long {0} message at line {1}" \
2101 " (see jobReport for further details)".
format(worstError[
'level'],
2102 worstError[
'firstError'][
'firstLine'])
2105 worstError[
'firstError'][
'message'])
2106 except OSError
as e:
2107 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2109 'Exception raised while attempting to scan logfile {0}: {1}'.
format(self.
_logFileName, e))
2111 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2112 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2113 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2115 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2117 msg.error(
'Fatal error in script logfile (level {0})'.
format(worstError[
'level']))
2118 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2122 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
2132 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
2135 if self.
_exe is None:
2141 if len(output) != 1:
2143 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
2144 outDataType =
list(output)[0]
2147 for dataType
in input:
2150 super(NTUPMergeExecutor, self).
preExecute(input=input, output=output)
2162 if 'maskEmptyInputs' in self.
conf.argdict
and self.
conf.argdict[
'maskEmptyInputs'].value
is True:
2164 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2165 nEvents = self.
conf.dataDictionary[self.
_inputBS].getSingleMetadata(fname,
'nentries')
2166 msg.debug(
'Found {0} events in file {1}'.
format(nEvents, fname))
2167 if isinstance(nEvents, int)
and nEvents > 0:
2168 eventfullFiles.append(fname)
2171 msg.info(
'The following input files are masked because they have 0 events: {0}'.
format(
' '.
join(self.
_maskedFiles)))
2172 if len(eventfullFiles) == 0:
2173 if 'emptyStubFile' in self.
conf.argdict
and path.exists(self.
conf.argdict[
'emptyStubFile'].value):
2175 msg.info(
"All input files are empty - will use stub file {0} as output".
format(self.
conf.argdict[
'emptyStubFile'].value))
2178 'All input files had zero events - aborting BS merge')
2185 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2187 print(fname, file=BSFileList)
2188 except OSError
as e:
2198 elif self.
conf.argdict[
'allowRename'].value
is True:
2200 msg.info(
'Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2204 errmsg =
'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2210 super(bsMergeExecutor, self).
preExecute(input=input, output=output)
2217 msg.info(
"Using stub file for empty BS output - execution is fake")
2226 super(bsMergeExecutor, self).
execute()
2236 except OSError
as e:
2251 if 'outputArchFile' not in self.
conf.argdict:
2256 with open(
'zip_wrapper.py',
'w')
as zip_wrapper:
2257 print(
"import zipfile, os, shutil", file=zip_wrapper)
2258 if os.path.exists(self.
conf.argdict[
'outputArchFile'].value[0]):
2260 print(
"zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".
format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2263 print(
"zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".
format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2264 print(
"for f in {}:".
format(self.
conf.argdict[
'inputDataFile'].value), file=zip_wrapper)
2267 print(
" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2268 print(
" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2269 print(
" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2270 print(
" archive.extractall('tmp')", file=zip_wrapper)
2271 print(
" archive.close()", file=zip_wrapper)
2273 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2274 print(
" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2275 print(
" os.unlink(f)", file=zip_wrapper)
2276 print(
" if os.path.isdir('tmp'):", file=zip_wrapper)
2277 print(
" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2278 print(
" for name in files:", file=zip_wrapper)
2279 print(
" print 'Zipping {}'.format(name)", file=zip_wrapper)
2280 print(
" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2281 print(
" shutil.rmtree('tmp')", file=zip_wrapper)
2282 print(
" else:", file=zip_wrapper)
2283 print(
" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2284 print(
" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2285 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2286 print(
" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2287 print(
" os.unlink(f)", file=zip_wrapper)
2288 print(
"zf.close()", file=zip_wrapper)
2289 os.chmod(
'zip_wrapper.py', 0o755)
2290 except OSError
as e:
2291 errMsg =
'error writing zip wrapper {fileName}: {error}'.
format(fileName =
'zip_wrapper.py',
2301 elif self.
_exe ==
'unarchive':
2303 for infile
in self.
conf.argdict[
'inputArchFile'].value:
2304 if not zipfile.is_zipfile(infile):
2306 'An input file is not a zip archive - aborting unpacking')
2307 self.
_cmd = [
'python']
2309 with open(
'unarchive_wrapper.py',
'w')
as unarchive_wrapper:
2310 print(
"import zipfile", file=unarchive_wrapper)
2311 print(
"for f in {}:".
format(self.
conf.argdict[
'inputArchFile'].value), file=unarchive_wrapper)
2312 print(
" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2313 print(
" path = '{}'".
format(self.
conf.argdict[
'path']), file=unarchive_wrapper)
2314 print(
" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2315 print(
" archive.extractall(path)", file=unarchive_wrapper)
2316 print(
" archive.close()", file=unarchive_wrapper)
2317 os.chmod(
'unarchive_wrapper.py', 0o755)
2318 except OSError
as e:
2319 errMsg =
'error writing unarchive wrapper {fileName}: {error}'.
format(fileName =
'unarchive_wrapper.py',
2327 super(archiveExecutor, self).
preExecute(input=input, output=output)