12 import os.path
as path
20 from fnmatch
import fnmatch
21 msg = logging.getLogger(__name__)
25 cvmfsDBReleaseCheck, forceToAlphaNum, \
26 ValgrindCommand, VTuneCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
27 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName, executorStepSuffix
33 import PyJobTransforms.trfExceptions
as trfExceptions
36 import PyJobTransforms.trfEnv
as trfEnv
45 enc = s.encoding.lower()
46 if enc.find(
'ascii') >= 0
or enc.find(
'ansi') >= 0:
47 return open (s.fileno(),
'w', encoding=
'utf-8')
62 def __init__(self, argdict={}, dataDictionary={}, firstExecutor=False):
81 @dataDictionary.setter
105 @totalExecutorSteps.setter
137 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
154 if len(dataOverlap) > 0:
156 'Executor definition error, executor {0} is not allowed to produce and consume the same datatypes. Duplicated input/output types {1}'.
format(self.
_name,
' '.
join(dataOverlap)))
166 self.
conf.setFromTransform(trf)
219 if '_substep' in dir(self):
225 if '_trf' in dir(self):
236 if '_inData' in dir(self):
246 if '_inData' in dir(self):
256 if '_outData' in dir(self):
266 if '_outData' in dir(self):
277 if '_input' in dir(self):
286 if '_output' in dir(self):
325 if hasattr(self,
'_first'):
472 msg.info(
'Preexecute for %s', self.
_name)
477 msg.info(
'Starting execution of %s', self.
_name)
481 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
486 msg.info(
'Postexecute for %s', self.
_name)
491 msg.info(
'Executor %s has no validation function - assuming all ok', self.
_name)
507 super(logscanExecutor, self).
__init__(name=name)
513 msg.info(
'Preexecute for %s', self.
_name)
514 if 'logfile' in self.
conf.argdict:
519 msg.info(
"Starting validation for {0}".
format(self.
_name))
523 if 'ignorePatterns' in self.
conf.argdict:
524 igPat = self.
conf.argdict[
'ignorePatterns'].value
527 if 'ignoreFiles' in self.
conf.argdict:
537 worstError = self.
_logScan.worstError()
541 if worstError[
'firstError']:
542 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
543 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
544 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
545 elif 'G4Exception' in worstError[
'firstError'][
'message']:
546 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
548 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".
format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
550 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".
format(self.
_logFileName, worstError[
'firstError'][
'message'])
552 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".
format(worstError[
'level'])
555 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
556 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
557 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
559 msg.error(
'Fatal error in athena logfile (level {0})'.
format(worstError[
'level']))
561 'Fatal error in athena logfile: "{0}"'.
format(exitErrorMessage))
564 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
576 super(echoExecutor, self).
__init__(name=name, trf=trf)
582 msg.info(
'Starting execution of %s', self.
_name)
583 msg.info(
'Transform argument dictionary now follows:')
584 for k, v
in self.
conf.argdict.items():
585 print(
"%s = %s" % (k, v))
589 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
595 def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData =
set()):
598 super(dummyExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
604 msg.info(
'Starting execution of %s', self.
_name)
606 for k, v
in self.
conf.argdict.items():
608 msg.info(
'Creating dummy output file: {0}'.
format(self.
conf.argdict[k].value[0]))
609 open(self.
conf.argdict[k].value[0],
'a').close()
613 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
619 def __init__(self, name = 'Script', trf = None, conf = None, inData = set(), outData =
set(),
620 exe =
None, exeArgs =
None, memMonitor =
True):
627 super(scriptExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData)
659 msg.debug(
'scriptExecutor: Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
665 if self.
_cmd is None:
667 msg.info(
'Will execute script as %s', self.
_cmd)
674 if 'TRF_ECHO' in os.environ:
675 msg.info(
'TRF_ECHO envvar is set - enabling command echoing to stdout')
677 elif 'TRF_NOECHO' in os.environ:
678 msg.info(
'TRF_NOECHO envvar is set - disabling command echoing to stdout')
682 msg.info(
'Interactive environment detected (stdio or stdout is a tty) - enabling command echoing to stdout')
684 elif 'TZHOME' in os.environ:
685 msg.info(
'Tier-0 environment detected - enabling command echoing to stdout')
688 msg.info(
'Batch/grid running - command outputs will not be echoed. Logs for {0} are in {1}'.
format(self.
_name, self.
_logFileName))
695 encargs = {
'encoding' :
'utf-8'}
697 self.
_exeLogFile.setFormatter(logging.Formatter(
'%(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
702 self.
_echostream.setFormatter(logging.Formatter(
'%(name)s %(asctime)s %(message)s', datefmt=
'%H:%M:%S'))
710 'No executor set in {0}'.
format(self.__class__.__name__))
712 if arg
in self.
conf.argdict:
716 if isinstance(self.
conf.argdict[arg].value, list):
724 msg.info(
'Starting execution of {0} ({1})'.
format(self.
_name, self.
_cmd))
728 if (
'execOnly' in self.
conf.argdict
and self.
conf.argdict[
'execOnly']
is True):
729 msg.info(
'execOnly flag is set - execution will now switch, replacing the transform')
732 encargs = {
'encoding' :
'utf8'}
738 msg.info(
"chdir /srv to launch a nested container for the substep")
740 p = subprocess.Popen(self.
_cmd, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1, **encargs)
743 msg.info(
"chdir {} after launching the nested container".
format(self._workdir))
744 os.chdir(self._workdir)
750 memMonitorCommand = [
'prmon',
'--pid',
str(p.pid),
'--filename',
'prmon.full.' + self.
_name,
753 mem_proc = subprocess.Popen(memMonitorCommand, shell =
False, close_fds=
True, **encargs)
755 except Exception
as e:
756 msg.warning(
'Failed to spawn memory monitor for {0}: {1}'.
format(self.
_name, e))
759 while p.poll()
is None:
760 line = p.stdout.readline()
764 for line
in p.stdout:
768 msg.info(
'%s executor returns %d', self.
_name, self.
_rc)
772 errMsg =
'Execution of {0} failed and raised OSError: {1}'.
format(self.
_cmd[0], e)
778 mem_proc.send_signal(signal.SIGUSR1)
780 while (
not mem_proc.poll())
and countWait < 10:
794 except Exception
as e:
808 msg.info(
'Executor {0} validated successfully (return code {1})'.
format(self.
_name, self.
_rc))
819 if trfExit.codeToSignalname(self.
_rc) !=
"":
820 self.
_errMsg =
'{0} got a {1} signal (exit code {2})'.
format(self.
_name, trfExit.codeToSignalname(self.
_rc), self.
_rc)
822 self.
_errMsg =
'Non-zero return code from %s (%d)' % (self.
_name, self.
_rc)
827 if 'checkEventCount' in self.
conf.argdict
and self.
conf.argdict[
'checkEventCount'].returnMyValue(exe=self)
is False:
828 msg.info(
'Event counting for substep {0} is skipped'.
format(self.
name))
833 msg.info(
'Event counting for substep {0} passed'.
format(self.
name))
841 _exitMessageLimit = 200
842 _defaultIgnorePatternFile = [
'atlas_error_mask.db']
881 def __init__(self, name = 'athena', trf = None, conf = None, skeletonFile=None, skeletonCA=None,
882 inData = set(), outData =
set(), inputDataTypeCountCheck =
None, exe =
'athena.py', exeArgs = [
'athenaopts'],
883 substep =
None, inputEventTest =
True, perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {}, runtimeRunargs = {},
884 literalRunargs = [], dataArgs = [], checkEventCount =
False, errorMaskFiles =
None,
885 manualDataDictionary =
None, memMonitor =
True, disableMT =
False, disableMP =
False, onlyMP =
False, onlyMT =
False, onlyMPWithRunargs =
None):
905 msg.debug(
"Resource monitoring from PerfMon is now deprecated")
908 if isinstance(skeletonFile, str):
913 super(athenaExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
914 exeArgs=exeArgs, memMonitor=memMonitor)
921 self.
_jobOptionsTemplate = JobOptionsTemplate(exe = self, version =
'$Id: trfExe.py 792052 2017-01-13 13:36:51Z mavogel $')
929 @inputDataTypeCountCheck.setter
971 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
979 if self.
conf.dataDictionary[dataType].nentries ==
'UNDEFINED':
982 thisInputEvents = self.
conf.dataDictionary[dataType].nentries
983 if thisInputEvents > inputEvents:
984 inputEvents = thisInputEvents
988 if (
'skipEvents' in self.
conf.argdict
and
989 self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
990 mySkipEvents = self.
conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
994 if (
'maxEvents' in self.
conf.argdict
and
995 self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None):
996 myMaxEvents = self.
conf.argdict[
'maxEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1001 if (self.
_inputEventTest and mySkipEvents > 0
and mySkipEvents >= inputEvents):
1003 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(mySkipEvents, inputEvents, dt))
1007 if (myMaxEvents != -1):
1009 expectedEvents = myMaxEvents
1011 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1013 expectedEvents = inputEvents-mySkipEvents
1016 msg.info(
'input event count is UNDEFINED, setting expectedEvents to 0')
1022 OSSetupString =
None
1026 legacyThreadingRelease =
False
1027 if 'asetup' in self.
conf.argdict:
1028 asetupString = self.
conf.argdict[
'asetup'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1033 if asetupString
is not None:
1035 currentOS = os.environ[
'ALRB_USER_PLATFORM']
1036 if legacyOSRelease
and "centos7" not in currentOS:
1037 OSSetupString =
"centos7"
1038 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self.
_substep, OSSetupString))
1042 if 'runInContainer' in self.
conf.argdict:
1043 OSSetupString = self.
conf.argdict[
'runInContainer'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1044 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self.
_substep, OSSetupString))
1045 if OSSetupString
is not None and asetupString
is None:
1047 '--asetup must be used for the substep which requires --runInContainer')
1052 if k
in self.
conf._argdict:
1056 if (((
'multithreaded' in self.
conf._argdict
and self.
conf._argdict[
'multithreaded'].value)
or (
'multiprocess' in self.
conf._argdict
and self.
conf._argdict[
'multiprocess'].value))
and
1057 (
'ATHENA_CORE_NUMBER' not in os.environ)):
1059 msg.warning(
'either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1071 msg.info(
"This configuration does not support MT, falling back to MP")
1079 msg.info(
"This configuration does not support MP, using MT")
1088 msg.info(
"Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".
format(expectedEvents, self.
_athenaMP))
1093 if self.
conf.totalExecutorSteps > 1:
1094 for dataType
in output:
1095 if self.
conf._dataDictionary[dataType].originalName:
1096 self.
conf._dataDictionary[dataType].value[0] = self.
conf._dataDictionary[dataType].originalName
1098 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1099 self.
conf._dataDictionary[dataType].value[0] +=
"_{0}{1}".
format(executorStepSuffix, self.
conf.executorStep)
1100 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1107 if 'athenaMPUseEventOrders' in self.
conf.argdict
and self.
conf._argdict[
'athenaMPUseEventOrders'].value
is True:
1112 if (
'athenaMPStrategy' in self.
conf.argdict
and
1113 (self.
conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
is not None)):
1119 if 'athenaMPMergeTargetSize' in self.
conf.argdict:
1120 for dataType
in output:
1121 if dataType
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1122 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[dataType] * 1000000
1123 msg.info(
'Set target merge size for {0} to {1}'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1126 matchedViaGlob =
False
1127 for mtsType, mtsSize
in self.
conf.argdict[
'athenaMPMergeTargetSize'].value.items():
1128 if fnmatch(dataType, mtsType):
1129 self.
conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1130 msg.info(
'Set target merge size for {0} to {1} from "{2}" glob'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1131 matchedViaGlob =
True
1133 if not matchedViaGlob
and "ALL" in self.
conf.argdict[
'athenaMPMergeTargetSize'].value:
1134 self.
conf._dataDictionary[dataType].mergeTargetSize = self.
conf.argdict[
'athenaMPMergeTargetSize'].value[
"ALL"] * 1000000
1135 msg.info(
'Set target merge size for {0} to {1} from "ALL" value'.
format(dataType, self.
conf._dataDictionary[dataType].mergeTargetSize))
1140 for dataType
in output:
1141 if self.
conf.totalExecutorSteps <= 1:
1142 self.
conf._dataDictionary[dataType].originalName = self.
conf._dataDictionary[dataType].value[0]
1143 if 'eventService' not in self.
conf.argdict
or 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value
is False:
1144 if 'sharedWriter' in self.
conf.argdict
and self.
conf.argdict[
'sharedWriter'].value:
1145 msg.info(
"SharedWriter: not updating athena output filename for {0}".
format(dataType))
1147 self.
conf._dataDictionary[dataType].value[0] +=
"_000"
1148 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.
conf._dataDictionary[dataType].value[0]))
1156 for dataType
in input:
1157 inputFiles[dataType] = self.
conf.dataDictionary[dataType]
1158 outputFiles = dict()
1159 for dataType
in output:
1160 outputFiles[dataType] = self.
conf.dataDictionary[dataType]
1164 for dataType, dataArg
in self.
conf.dataDictionary.items():
1165 if isinstance(dataArg, list)
and dataArg:
1166 if self.
conf.totalExecutorSteps <= 1:
1167 raise ValueError(
'Multiple input arguments provided but only running one substep')
1168 if self.
conf.totalExecutorSteps != len(dataArg):
1169 raise ValueError(f
'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1171 if dataArg[self.
conf.executorStep].io ==
'input' and nameForFiles
in dataArg[self.
conf.executorStep].executor:
1172 inputFiles[dataArg[self.
conf.executorStep].subtype] = dataArg
1174 if dataArg.io ==
'input' and nameForFiles
in dataArg.executor:
1175 inputFiles[dataArg.subtype] = dataArg
1177 msg.debug(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
1181 output = outputFiles)
1191 dbrelease = dbsetup =
None
1192 if 'DBRelease' in self.
conf.argdict:
1193 dbrelease = self.
conf.argdict[
'DBRelease'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.
conf.firstExecutor)
1194 if path.islink(dbrelease):
1195 dbrelease = path.realpath(dbrelease)
1198 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1200 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
1201 if not os.access(dbrelease, os.R_OK):
1202 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
1203 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
1204 dbrelease = dbdMatch.group(1)
1208 msg.debug(
'Setting up {0} from {1}'.
format(dbdMatch.group(1), dbrelease))
1209 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1223 super(athenaExecutor, self).
preExecute(input, output)
1228 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self.
_name))
1230 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self.
_cmd))
1237 if self.
conf.totalExecutorSteps > 1:
1239 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1241 if self.
conf.executorStep == self.
conf.totalExecutorSteps - 1:
1247 for i
in range(self.
conf.totalExecutorSteps):
1248 for v
in self.
conf.dataDictionary[dataType].value:
1249 newValue.append(v.replace(
'_{0}{1}_'.
format(executorStepSuffix, self.
conf.executorStep),
1250 '_{0}{1}_'.
format(executorStepSuffix, i)))
1252 self.
conf.dataDictionary[dataType].multipleOK =
True
1254 for i
in range(self.
conf.totalExecutorSteps):
1255 newValue.append(self.
conf.dataDictionary[dataType].originalName +
'_{0}{1}'.
format(executorStepSuffix, i))
1256 self.
conf.dataDictionary[dataType].value = newValue
1259 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1264 outputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_output ])
1266 skipFileChecks=
False
1267 if 'eventService' in self.
conf.argdict
and self.
conf.argdict[
'eventService'].value:
1271 if self.
conf.dataDictionary[dataType].io ==
"output" and len(self.
conf.dataDictionary[dataType].value) > 1:
1274 if 'TXT_JIVEXMLTGZ' in self.
conf.dataDictionary:
1285 msg.info(
'scanning {0} for reporting events passed the filter ISF_SimEventFilter'.
format(self.
_logFileName))
1293 if (
'deleteIntermediateOutputfiles' in self.
conf._argdict
and self.
conf._argdict[
'deleteIntermediateOutputfiles'].value):
1294 inputDataDictionary = dict([ (dataType, self.
conf.dataDictionary[dataType])
for dataType
in self.
_input ])
1296 for k, v
in inputDataDictionary.items():
1297 if not v.io ==
'temporary':
1299 for filename
in v.value:
1300 if os.access(filename, os.R_OK)
and not filename.startswith(
"/cvmfs"):
1301 msg.info(
"Removing intermediate {0} input file {1}".
format(k, filename))
1303 if (os.path.realpath(filename) != filename):
1304 targetpath = os.path.realpath(filename)
1306 if (targetpath)
and os.access(targetpath, os.R_OK):
1307 os.unlink(targetpath)
1313 deferredException =
None
1314 memLeakThreshold = 5000
1319 super(athenaExecutor, self).
validate()
1322 msg.error(
'Validation of return code failed: {0!s}'.
format(e))
1323 deferredException = e
1333 msg.info(
'Analysing memory monitor output file {0} for possible memory leak'.
format(self.
_memFullFile))
1338 msg.warning(
'Possible memory leak; abnormally high values in memory monitor parameters (ignore this message if the job has finished successfully)')
1340 msg.warning(
'Failed to analyse the memory monitor file {0}'.
format(self.
_memFullFile))
1342 msg.info(
'No memory monitor file to be analysed')
1348 if 'ignorePatterns' in self.
conf.argdict:
1349 igPat = self.
conf.argdict[
'ignorePatterns'].value
1352 if 'ignoreFiles' in self.
conf.argdict:
1362 ignoreList=ignorePatterns)
1363 worstError = self.
_logScan.worstError()
1369 if worstError[
'firstError']:
1370 if len(worstError[
'firstError'][
'message']) > athenaExecutor._exitMessageLimit:
1371 if 'CoreDumpSvc' in worstError[
'firstError'][
'message']:
1372 exitErrorMessage =
"Core dump at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
1373 elif 'G4Exception' in worstError[
'firstError'][
'message']:
1374 exitErrorMessage =
"G4 exception at line {0} (see jobReport for further details)".
format(worstError[
'firstError'][
'firstLine'])
1376 exitErrorMessage =
"Long {0} message at line {1} (see jobReport for further details)".
format(worstError[
'level'], worstError[
'firstError'][
'firstLine'])
1378 exitErrorMessage =
"Logfile error in {0}: \"{1}\"".
format(self.
_logFileName, worstError[
'firstError'][
'message'])
1380 exitErrorMessage =
"Error level {0} found (see athena logfile for details)".
format(worstError[
'level'])
1383 if deferredException
is not None:
1385 if worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1386 deferredException.errMsg = deferredException.errMsg +
"; {0}".
format(exitErrorMessage)
1389 deferredException.errMsg = deferredException.errMsg +
"; Possible memory leak: 'pss' slope: {0} KB/s".
format(self.
_memLeakResult[
'slope'])
1390 raise deferredException
1394 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
1395 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1396 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1398 msg.error(
'Fatal error in athena logfile (level {0})'.
format(worstError[
'level']))
1401 exitErrorMessage = exitErrorMessage +
"; Possible memory leak: 'pss' slope: {0} KB/s".
format(self.
_memLeakResult[
'slope'])
1403 'Fatal error in athena logfile: "{0}"'.
format(exitErrorMessage))
1406 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
1415 if 'CA' not in self.
conf.argdict:
1423 if self.
conf.argdict[
'CA']
is None:
1427 if self.
conf.argdict[
'CA'].returnMyValue(name=self.
name, substep=self.
substep)
is True:
1437 if 'athena' in self.
conf.argdict:
1442 currentSubstep =
None
1443 if 'athenaopts' in self.
conf.argdict:
1445 if currentName
in self.
conf.argdict[
'athenaopts'].value:
1446 currentSubstep = currentName
1447 if self.
substep in self.
conf.argdict[
'athenaopts'].value:
1448 msg.info(
'Athenaopts found for {0} and {1}, joining options. '
1449 'Consider changing your configuration to use just the name or the alias of the substep.'
1451 self.
conf.argdict[
'athenaopts'].value[currentSubstep].
extend(self.
conf.argdict[
'athenaopts'].value[self.
substep])
1452 del self.
conf.argdict[
'athenaopts'].value[self.
substep]
1453 msg.debug(
'Athenaopts: {0}'.
format(self.
conf.argdict[
'athenaopts'].value))
1454 elif self.
substep in self.
conf.argdict[
'athenaopts'].value:
1456 elif 'all' in self.
conf.argdict[
'athenaopts'].value:
1457 currentSubstep =
'all'
1460 preLoadUpdated = dict()
1462 preLoadUpdated[currentSubstep] =
False
1463 if 'athenaopts' in self.
conf.argdict:
1464 if currentSubstep
is not None:
1465 for athArg
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]:
1468 if athArg.startswith(
'--preloadlib'):
1470 i = self.
conf.argdict[
'athenaopts'].value[currentSubstep].
index(athArg)
1471 v = athArg.split(
'=', 1)[1]
1472 msg.info(
'Updating athena --preloadlib option for substep {1} with: {0}'.
format(self.
_envUpdate.
value(
'LD_PRELOAD'), self.
name))
1474 self.
conf.argdict[
'athenaopts']._value[currentSubstep][i] =
'--preloadlib={0}'.
format(newPreloads)
1475 except Exception
as e:
1476 msg.warning(
'Failed to interpret athena option: {0} ({1})'.
format(athArg, e))
1477 preLoadUpdated[currentSubstep] =
True
1479 if not preLoadUpdated[currentSubstep]:
1481 if 'athenaopts' in self.
conf.argdict:
1482 if currentSubstep
is not None:
1490 if 'athenaopts' in self.
conf.argdict:
1491 if currentSubstep
is None and "all" in self.
conf.argdict[
'athenaopts'].value:
1493 elif currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1494 self.
_cmd.
extend(self.
conf.argdict[
'athenaopts'].value[currentSubstep])
1496 if currentSubstep
is None:
1497 currentSubstep =
'all'
1501 msg.info(
'ignoring "--drop-and-reload" for CA-based transforms, config cleaned up anyway')
1502 elif 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1503 msg.info(
'Disabling "--drop-and-reload" because the job is configured to use Valgrind')
1504 elif 'athenaopts' in self.
conf.argdict:
1505 athenaConfigRelatedOpts = [
'--config-only',
'--drop-and-reload']
1507 if currentSubstep
in self.
conf.argdict[
'athenaopts'].value:
1508 conflictOpts =
set(athenaConfigRelatedOpts).
intersection(
set([opt.split(
'=')[0]
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep]]))
1509 if len(conflictOpts) > 0:
1510 msg.info(
'Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.
format(
list(conflictOpts)))
1512 msg.info(
'Appending "--drop-and-reload" to athena options')
1515 msg.info(
'No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.
format(self.
name))
1519 msg.info(
'Appending "--drop-and-reload" to athena options')
1522 msg.info(
'Skipping test for "--drop-and-reload" in this executor')
1527 if not (
'athenaopts' in self.
conf.argdict
and
1528 any(
'--threads' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1533 if not (
'athenaopts' in self.
conf.argdict
and
1534 any(
'--nprocs' in opt
for opt
in self.
conf.argdict[
'athenaopts'].value[currentSubstep])):
1540 msg.info(
'Updated script arguments with topoptions: %s', self.
_cmd)
1560 setupATLAS =
'my_setupATLAS.sh'
1561 with open(setupATLAS,
'w')
as f:
1562 print(
"#!/bin/bash", file=f)
1564 if [ -z $ATLAS_LOCAL_ROOT_BASE ]; then
1565 export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase
1567 source ${ATLAS_LOCAL_ROOT_BASE}/user/atlasLocalSetup.sh"""
1569 os.chmod(setupATLAS, 0o755)
1572 'Preparing wrapper file {wrapperFileName} with '
1573 'asetup={asetupStatus} and dbsetup={dbsetupStatus}'.
format(
1580 container_cmd =
None
1583 print(
'#!/bin/sh', file=wrapper)
1585 container_cmd = [ os.path.abspath(setupATLAS),
1593 print(
'echo "This wrapper is executed within a container! For a local re-run, do:"', file=wrapper)
1594 print(
'echo " '+
" ".
join([
'setupATLAS'] + container_cmd[1:] + [path.join(
'.', self.
_wrapperFile)]) +
'"', file=wrapper)
1595 print(
'echo "N.B.: if launching a nested container, navigate to /srv before running the above command"',
1597 print(
'echo " and use --pwd workdir, where workdir is the transform running directory within /srv"',
1599 print(
'echo', file=wrapper)
1609 print(f
'source ./{setupATLAS} -q', file=wfile)
1610 print(f
'asetup {asetup}', file=wfile)
1611 print(
'if [ ${?} != "0" ]; then exit 255; fi', file=wfile)
1613 dbroot = path.dirname(dbsetup)
1614 dbversion = path.basename(dbroot)
1615 print(
"# DBRelease setup", file=wrapper)
1616 print(
'echo Setting up DBRelease {dbroot} environment'.
format(dbroot = dbroot), file=wrapper)
1617 print(
'export DBRELEASE={dbversion}'.
format(dbversion = dbversion), file=wrapper)
1618 print(
'export CORAL_AUTH_PATH={directory}'.
format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1619 print(
'export CORAL_DBLOOKUP_PATH={directory}'.
format(directory = path.join(dbroot,
'XMLConfig')), file=wrapper)
1620 print(
'export TNS_ADMIN={directory}'.
format(directory = path.join(dbroot,
'oracle-admin')), file=wrapper)
1621 print(
'DATAPATH={dbroot}:$DATAPATH'.
format(dbroot = dbroot), file=wrapper)
1623 print(
"# AthenaMT explicitly disabled for this executor", file=wrapper)
1625 print(
"# AthenaMP explicitly disabled for this executor", file=wrapper)
1628 if not envSetting.startswith(
'LD_PRELOAD'):
1629 print(
"export", envSetting, file=wrapper)
1633 if 'valgrind' in self.
conf._argdict
and self.
conf._argdict[
'valgrind'].value
is True:
1634 msg.info(
'Valgrind engaged')
1637 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".
format(
1641 print(
' '.
join(self.
_cmd),
"--config-only={0}".
format(AthenaSerialisedConfigurationFile), file=wrapper)
1642 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1645 if 'valgrindDefaultOpts' in self.
conf._argdict:
1646 defaultOptions = self.
conf._argdict[
'valgrindDefaultOpts'].value
1648 defaultOptions =
True
1649 if 'valgrindExtraOpts' in self.
conf._argdict:
1650 extraOptionsList = self.
conf._argdict[
'valgrindExtraOpts'].value
1652 extraOptionsList =
None
1653 msg.debug(
"requested Valgrind command basic options: {options}".
format(options = defaultOptions))
1654 msg.debug(
"requested Valgrind command extra options: {options}".
format(options = extraOptionsList))
1656 defaultOptions = defaultOptions,
1657 extraOptionsList = extraOptionsList,
1658 AthenaSerialisedConfigurationFile = \
1659 AthenaSerialisedConfigurationFile
1661 msg.debug(
"Valgrind command: {command}".
format(command = command))
1662 print(command, file=wrapper)
1666 elif 'vtune' in self.
conf._argdict
and self.
conf._argdict[
'vtune'].value
is True:
1667 msg.info(
'VTune engaged')
1670 AthenaSerialisedConfigurationFile =
"{name}Conf.pkl".
format(
1674 print(
' '.
join(self.
_cmd),
"--config-only={0}".
format(AthenaSerialisedConfigurationFile), file=wrapper)
1675 print(
'if [ $? != "0" ]; then exit 255; fi', file=wrapper)
1678 if 'vtuneDefaultOpts' in self.
conf._argdict:
1679 defaultOptions = self.
conf._argdict[
'vtuneDefaultOpts'].value
1681 defaultOptions =
True
1682 if 'vtuneExtraOpts' in self.
conf._argdict:
1683 extraOptionsList = self.
conf._argdict[
'vtuneExtraOpts'].value
1685 extraOptionsList =
None
1686 msg.debug(
"requested VTune command basic options: {options}".
format(options = defaultOptions))
1687 msg.debug(
"requested VTune command extra options: {options}".
format(options = extraOptionsList))
1689 defaultOptions = defaultOptions,
1690 extraOptionsList = extraOptionsList,
1691 AthenaSerialisedConfigurationFile = \
1692 AthenaSerialisedConfigurationFile
1694 msg.debug(
"VTune command: {command}".
format(command = command))
1695 print(command, file=wrapper)
1697 msg.info(
'Valgrind/VTune not engaged')
1701 except OSError
as e:
1702 errMsg =
'error writing athena wrapper {fileName}: {error}'.
format(
1708 trfExit.nameToCode(
'TRF_EXEC_SETUP_WRAPPER'),
1714 self.
_cmd = container_cmd + self.
_cmd
1721 if 'selfMerge' not in dir(fileArg):
1722 msg.info(
'Files in {0} cannot merged (no selfMerge() method is implemented)'.
format(fileArg.name))
1725 if fileArg.mergeTargetSize == 0:
1726 msg.info(
'Files in {0} will not be merged as target size is set to 0'.
format(fileArg.name))
1730 mergeCandidates = [
list()]
1731 currentMergeSize = 0
1732 for fname
in fileArg.value:
1733 size = fileArg.getSingleMetadata(fname,
'file_size')
1734 if not isinstance(size, int):
1735 msg.warning(
'File size metadata for {0} was not correct, found type {1}. Aborting merge attempts.'.
format(fileArg,
type(size)))
1738 if len(mergeCandidates[-1]) == 0:
1739 msg.debug(
'Adding file {0} to current empty merge list'.
format(fname))
1740 mergeCandidates[-1].
append(fname)
1741 currentMergeSize += size
1744 if fileArg.mergeTargetSize < 0
or math.fabs(currentMergeSize + size - fileArg.mergeTargetSize) < math.fabs(currentMergeSize - fileArg.mergeTargetSize):
1745 msg.debug(
'Adding file {0} to merge list {1} as it gets closer to the target size'.
format(fname, mergeCandidates[-1]))
1746 mergeCandidates[-1].
append(fname)
1747 currentMergeSize += size
1750 msg.debug(
'Starting a new merge list with file {0}'.
format(fname))
1751 mergeCandidates.append([fname])
1752 currentMergeSize = size
1754 msg.debug(
'First pass splitting will merge files in this way: {0}'.
format(mergeCandidates))
1756 if len(mergeCandidates) == 1:
1759 mergeNames = [fileArg.originalName]
1764 for mergeGroup
in mergeCandidates:
1767 mergeName = fileArg.originalName +
'_{0}'.
format(counter)
1768 while path.exists(mergeName):
1770 mergeName = fileArg.originalName +
'_{0}'.
format(counter)
1771 mergeNames.append(mergeName)
1774 for targetName, mergeGroup, counter
in zip(mergeNames, mergeCandidates,
list(
range(len(mergeNames)))):
1775 msg.info(
'Want to merge files {0} to {1}'.
format(mergeGroup, targetName))
1776 if len(mergeGroup) <= 1:
1777 msg.info(
'Skip merging for single file')
1780 self.
_myMerger.
append(fileArg.selfMerge(output=targetName, inputs=mergeGroup, counter=counter, argdict=self.
conf.argdict))
1785 targetTGZName = self.
conf.dataDictionary[
'TXT_JIVEXMLTGZ'].value[0]
1786 if os.path.exists(targetTGZName):
1787 os.remove(targetTGZName)
1790 fNameRE = re.compile(
r"JiveXML\_\d+\_\d+.xml")
1793 tar = tarfile.open(targetTGZName,
"w:gz")
1794 for fName
in os.listdir(
'.'):
1795 matches = fNameRE.findall(fName)
1796 if len(matches) > 0:
1797 if fNameRE.findall(fName)[0] == fName:
1798 msg.info(
'adding %s to %s', fName, targetTGZName)
1802 msg.info(
'JiveXML compression: %s has been written and closed.', targetTGZName)
1812 super(optionalAthenaExecutor, self).
validate()
1815 msg.warning(
'Validation failed for {0}: {1}'.
format(self.
_name, e))
1835 def __init__(self, name = 'hybridPOOLMerge', trf = None, conf = None, skeletonFile=None, skeletonCA='RecJobTransforms.MergePool_Skeleton',
1836 inData = set(), outData =
set(), exe =
'athena.py', exeArgs = [
'athenaopts'], substep =
None, inputEventTest =
True,
1837 perfMonFile =
None, tryDropAndReload =
True, extraRunargs = {},
1838 manualDataDictionary =
None, memMonitor =
True):
1840 super(POOLMergeExecutor, self).
__init__(name, trf=trf, conf=conf, skeletonFile=skeletonFile, skeletonCA=skeletonCA,
1841 inData=inData, outData=outData, exe=exe, exeArgs=exeArgs, substep=substep,
1842 inputEventTest=inputEventTest, perfMonFile=perfMonFile,
1843 tryDropAndReload=tryDropAndReload, extraRunargs=extraRunargs,
1844 manualDataDictionary=manualDataDictionary, memMonitor=memMonitor)
1848 super(POOLMergeExecutor, self).
preExecute(input=input, output=output)
1853 super(POOLMergeExecutor, self).
execute()
1863 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
1864 if 'NTUP_PILEUP' not in output:
1866 if 'formats' not in self.
conf.argdict:
1868 'No derivation configuration specified')
1870 if (
'DAOD' not in output)
and (
'D2AOD' not in output):
1872 'No base name for DAOD output')
1875 if 'formats' in self.
conf.argdict: formatList = self.
conf.argdict[
'formats'].value
1876 for reduction
in formatList:
1877 if (
'DAOD' in output):
1878 dataType =
'DAOD_' + reduction
1879 if 'augmentations' not in self.
conf.argdict:
1880 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1882 for val
in self.
conf.argdict[
'augmentations'].value:
1883 if reduction
in val.split(
':')[0]:
1884 outputName =
'DAOD_' + val.split(
':')[1] +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1887 outputName =
'DAOD_' + reduction +
'.' + self.
conf.argdict[
'outputDAODFile'].value[0]
1889 if (
'D2AOD' in output):
1890 dataType =
'D2AOD_' + reduction
1891 outputName =
'D2AOD_' + reduction +
'.' + self.
conf.argdict[
'outputD2AODFile'].value[0]
1893 msg.info(
'Adding reduction output type {0}'.
format(dataType))
1894 output.add(dataType)
1898 self.
conf.dataDictionary[dataType] = newReduction
1902 if (
'DAOD' in output):
1903 output.remove(
'DAOD')
1904 del self.
conf.dataDictionary[
'DAOD']
1905 del self.
conf.argdict[
'outputDAODFile']
1906 if (
'D2AOD' in output):
1907 output.remove(
'D2AOD')
1908 del self.
conf.dataDictionary[
'D2AOD']
1909 del self.
conf.argdict[
'outputD2AODFile']
1911 msg.info(
'Data dictionary is now: {0}'.
format(self.
conf.dataDictionary))
1912 msg.info(
'Input/Output: {0}/{1}'.
format(input, output))
1914 msg.info(
'Data dictionary is now: {0}'.
format(self.
conf.dataDictionary))
1915 msg.info(
'Input/Output: {0}/{1}'.
format(input, output))
1916 super(reductionFrameworkExecutor, self).
preExecute(input, output)
1921 def __init__(self, name='DQHistMerge', trf=None, conf=None, inData=set([
'HIST_AOD',
'HIST_ESD']), outData=
set([
'HIST']),
1922 exe=
'DQHistogramMerge.py', exeArgs = [], memMonitor =
True):
1926 super(DQMergeExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
1927 exeArgs=exeArgs, memMonitor=memMonitor)
1932 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
1934 super(DQMergeExecutor, self).
preExecute(input=input, output=output)
1938 for dataType
in input:
1939 for fname
in self.
conf.dataDictionary[dataType].value:
1940 self.
conf.dataDictionary[dataType]._getNumberOfEvents([fname])
1941 print(fname, file=DQMergeFile)
1946 if len(output) != 1:
1948 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
1949 outDataType =
list(output)[0]
1953 if (self.
conf._argdict.get(
"run_post_processing",
False)):
1958 if (self.
conf._argdict.get(
"is_incremental_merge",
False)):
1963 for k
in (
"excludeHist",
"excludeDir"):
1964 if k
in self.
conf._argdict:
1970 super(DQMergeExecutor, self).
validate()
1972 exitErrorMessage =
''
1976 worstError = logScan.worstError()
1980 if worstError[
'firstError']:
1981 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
1982 exitErrorMessage =
"Long {0} message at line {1}" \
1983 " (see jobReport for further details)".
format(worstError[
'level'],
1984 worstError[
'firstError'][
'firstLine'])
1987 worstError[
'firstError'][
'message'])
1988 except OSError
as e:
1989 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
1991 'Exception raised while attempting to scan logfile {0}: {1}'.
format(self.
_logFileName, e))
1993 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
1994 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
1995 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
1997 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
1999 msg.error(
'Fatal error in script logfile (level {0})'.
format(worstError[
'level']))
2000 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2004 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
2012 def __init__(self, name='DQMPostProcess', trf=None, conf=None, inData=set([
'HIST']), outData=
set([
'HIST']),
2013 exe=
'DQM_Tier0Wrapper_tf.py', exeArgs = [], memMonitor =
True):
2017 super(DQMPostProcessExecutor, self).
__init__(name=name, trf=trf, conf=conf, inData=inData, outData=outData, exe=exe,
2018 exeArgs=exeArgs, memMonitor=memMonitor)
2023 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
2025 super(DQMPostProcessExecutor, self).
preExecute(input=input, output=output)
2028 dsName=self.
conf.argdict[
"inputHISTFile"].dataset
2030 for dataType
in input:
2031 for fname
in self.
conf.dataDictionary[dataType].value:
2033 if not dsName: dsName=
".".
join(fname.split(
'.')[0:4])
2034 inputList.append(
"#".
join([dsName,fname]))
2037 if len(output) != 1:
2039 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
2040 outDataType =
list(output)[0]
2044 wrapperParams={
"inputHistFiles" : inputList,
2045 "outputHistFile" : dsName+
"#"+self.
conf.dataDictionary[outDataType].value[0],
2046 "incrementalMode":
"True" if self.
conf._argdict.get(
"is_incremental_merge",
False)
else "False",
2047 "postProcessing" :
"True" if self.
conf._argdict.get(
"run_post_processing",
False)
else "False",
2048 "doWebDisplay" :
"True" if self.
conf._argdict.get(
"doWebDisplay",
False)
else "False",
2049 "allowCOOLUpload":
"True" if self.
conf._argdict.get(
"allowCOOLUpload",
False)
else "False",
2053 if "servers" in self.
conf._argdict:
2054 wrapperParams[
"server"]=self.
conf._argdict[
"servers"]
2056 for k
in (
"excludeHist",
"excludeDir"):
2057 if k
in self.
conf._argdict:
2058 wrapperParams[
"mergeParams"]+=(
" --{0}={1}".
format(k,self.
conf._argdict[k]))
2061 with open(
"args.json",
"w")
as f:
2062 json.dump(wrapperParams, f)
2070 super(DQMPostProcessExecutor, self).
validate()
2072 exitErrorMessage =
''
2076 worstError = logScan.worstError()
2080 if worstError[
'firstError']:
2081 if len(worstError[
'firstError'][
'message']) > logScan._msgLimit:
2082 exitErrorMessage =
"Long {0} message at line {1}" \
2083 " (see jobReport for further details)".
format(worstError[
'level'],
2084 worstError[
'firstError'][
'firstLine'])
2087 worstError[
'firstError'][
'message'])
2088 except OSError
as e:
2089 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2091 'Exception raised while attempting to scan logfile {0}: {1}'.
format(self.
_logFileName, e))
2093 if worstError[
'nLevel'] == stdLogLevels[
'ERROR']
and (
2094 'ignoreErrors' in self.
conf.argdict
and self.
conf.argdict[
'ignoreErrors'].value
is True):
2095 msg.warning(
'Found ERRORs in the logfile, but ignoring this as ignoreErrors=True (see jobReport for details)')
2097 elif worstError[
'nLevel'] >= stdLogLevels[
'ERROR']:
2099 msg.error(
'Fatal error in script logfile (level {0})'.
format(worstError[
'level']))
2100 exitCode = trfExit.nameToCode(
'TRF_EXEC_LOGERROR')
2104 msg.info(
'Executor {0} has validated successfully'.
format(self.
name))
2114 msg.debug(
'[NTUP] Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.
name, input, output))
2117 if self.
_exe is None:
2123 if len(output) != 1:
2125 'One (and only one) output file must be given to {0} (got {1})'.
format(self.
name, len(output)))
2126 outDataType =
list(output)[0]
2129 for dataType
in input:
2132 super(NTUPMergeExecutor, self).
preExecute(input=input, output=output)
2144 if 'maskEmptyInputs' in self.
conf.argdict
and self.
conf.argdict[
'maskEmptyInputs'].value
is True:
2146 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2147 nEvents = self.
conf.dataDictionary[self.
_inputBS].getSingleMetadata(fname,
'nentries')
2148 msg.debug(
'Found {0} events in file {1}'.
format(nEvents, fname))
2149 if isinstance(nEvents, int)
and nEvents > 0:
2150 eventfullFiles.append(fname)
2153 msg.info(
'The following input files are masked because they have 0 events: {0}'.
format(
' '.
join(self.
_maskedFiles)))
2154 if len(eventfullFiles) == 0:
2155 if 'emptyStubFile' in self.
conf.argdict
and path.exists(self.
conf.argdict[
'emptyStubFile'].value):
2157 msg.info(
"All input files are empty - will use stub file {0} as output".
format(self.
conf.argdict[
'emptyStubFile'].value))
2160 'All input files had zero events - aborting BS merge')
2167 for fname
in self.
conf.dataDictionary[self.
_inputBS].value:
2169 print(fname, file=BSFileList)
2170 except OSError
as e:
2180 elif self.
conf.argdict[
'allowRename'].value
is True:
2182 msg.info(
'Output filename does not end in "._0001.data" will proceed, but be aware that the internal filename metadata will be wrong')
2186 errmsg =
'Output filename for outputBS_MRGFile must end in "._0001.data" or infile metadata will be wrong'
2192 super(bsMergeExecutor, self).
preExecute(input=input, output=output)
2199 msg.info(
"Using stub file for empty BS output - execution is fake")
2208 super(bsMergeExecutor, self).
execute()
2218 except OSError
as e:
2233 if 'outputArchFile' not in self.
conf.argdict:
2238 with open(
'zip_wrapper.py',
'w')
as zip_wrapper:
2239 print(
"import zipfile, os, shutil", file=zip_wrapper)
2240 if os.path.exists(self.
conf.argdict[
'outputArchFile'].value[0]):
2242 print(
"zf = zipfile.ZipFile('{}', mode='a', allowZip64=True)".
format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2245 print(
"zf = zipfile.ZipFile('{}', mode='w', allowZip64=True)".
format(self.
conf.argdict[
'outputArchFile'].value[0]), file=zip_wrapper)
2246 print(
"for f in {}:".
format(self.
conf.argdict[
'inputDataFile'].value), file=zip_wrapper)
2249 print(
" if zipfile.is_zipfile(f) and '.zip' in f:", file=zip_wrapper)
2250 print(
" archive = zipfile.ZipFile(f, mode='r')", file=zip_wrapper)
2251 print(
" print 'Extracting input zip file {0} to temporary directory {1}'.format(f,'tmp')", file=zip_wrapper)
2252 print(
" archive.extractall('tmp')", file=zip_wrapper)
2253 print(
" archive.close()", file=zip_wrapper)
2255 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2256 print(
" print 'Removing input zip file {}'.format(f)", file=zip_wrapper)
2257 print(
" os.unlink(f)", file=zip_wrapper)
2258 print(
" if os.path.isdir('tmp'):", file=zip_wrapper)
2259 print(
" for root, dirs, files in os.walk('tmp'):", file=zip_wrapper)
2260 print(
" for name in files:", file=zip_wrapper)
2261 print(
" print 'Zipping {}'.format(name)", file=zip_wrapper)
2262 print(
" zf.write(os.path.join(root, name), name, compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2263 print(
" shutil.rmtree('tmp')", file=zip_wrapper)
2264 print(
" else:", file=zip_wrapper)
2265 print(
" print 'Zipping {}'.format(os.path.basename(f))", file=zip_wrapper)
2266 print(
" zf.write(f, arcname=os.path.basename(f), compress_type=zipfile.ZIP_STORED)", file=zip_wrapper)
2267 print(
" if os.access(f, os.F_OK):", file=zip_wrapper)
2268 print(
" print 'Removing input file {}'.format(f)", file=zip_wrapper)
2269 print(
" os.unlink(f)", file=zip_wrapper)
2270 print(
"zf.close()", file=zip_wrapper)
2271 os.chmod(
'zip_wrapper.py', 0o755)
2272 except OSError
as e:
2273 errMsg =
'error writing zip wrapper {fileName}: {error}'.
format(fileName =
'zip_wrapper.py',
2283 elif self.
_exe ==
'unarchive':
2285 for infile
in self.
conf.argdict[
'inputArchFile'].value:
2286 if not zipfile.is_zipfile(infile):
2288 'An input file is not a zip archive - aborting unpacking')
2289 self.
_cmd = [
'python']
2291 with open(
'unarchive_wrapper.py',
'w')
as unarchive_wrapper:
2292 print(
"import zipfile", file=unarchive_wrapper)
2293 print(
"for f in {}:".
format(self.
conf.argdict[
'inputArchFile'].value), file=unarchive_wrapper)
2294 print(
" archive = zipfile.ZipFile(f, mode='r')", file=unarchive_wrapper)
2295 print(
" path = '{}'".
format(self.
conf.argdict[
'path']), file=unarchive_wrapper)
2296 print(
" print 'Extracting archive {0} to {1}'.format(f,path)", file=unarchive_wrapper)
2297 print(
" archive.extractall(path)", file=unarchive_wrapper)
2298 print(
" archive.close()", file=unarchive_wrapper)
2299 os.chmod(
'unarchive_wrapper.py', 0o755)
2300 except OSError
as e:
2301 errMsg =
'error writing unarchive wrapper {fileName}: {error}'.
format(fileName =
'unarchive_wrapper.py',
2309 super(archiveExecutor, self).
preExecute(input=input, output=output)