969 def preExecute(self, input = set(), output =
set()):
970 self.setPreExeStart()
971 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.name, input, output))
976 if self._inputDataTypeCountCheck
is None:
977 self._inputDataTypeCountCheck = input
978 for dataType
in self._inputDataTypeCountCheck:
979 if self.conf.dataDictionary[dataType].nentries ==
'UNDEFINED':
982 thisInputEvents = self.conf.dataDictionary[dataType].nentries
983 if thisInputEvents > inputEvents:
984 inputEvents = thisInputEvents
988 if (
'skipEvents' in self.conf.argdict
and
989 self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
990 mySkipEvents = self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
994 if (
'maxEvents' in self.conf.argdict
and
995 self.conf.argdict[
'maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
996 myMaxEvents = self.conf.argdict[
'maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001 if (self._inputEventTest
and mySkipEvents > 0
and mySkipEvents >= inputEvents):
1002 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_NOEVENTS'),
1003 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(mySkipEvents, inputEvents, dt))
1007 if (myMaxEvents != -1):
1008 if (self.inData
and next(iter(self.inData)) ==
'inNULL'):
1009 expectedEvents = myMaxEvents
1011 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1013 expectedEvents = inputEvents-mySkipEvents
1016 msg.info(
'input event count is UNDEFINED, setting expectedEvents to 0')
1022 OSSetupString =
None
1026 legacyThreadingRelease =
False
1027 if 'asetup' in self.conf.argdict:
1028 asetupString = self.conf.argdict[
'asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1033 if asetupString
is not None:
1035 currentOS = os.environ[
'ALRB_USER_PLATFORM']
1036 if legacyOSRelease
and "centos7" not in currentOS:
1037 OSSetupString =
"centos7"
1038 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
1042 if 'runInContainer' in self.conf.argdict:
1043 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1044 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
1045 if OSSetupString
is not None and asetupString
is None:
1046 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_EXEC_SETUP_FAIL'),
1047 '--asetup must be used for the substep which requires --runInContainer')
1050 if self._onlyMPWithRunargs:
1051 for k
in self._onlyMPWithRunargs:
1052 if k
in self.conf._argdict:
1056 if (((
'multithreaded' in self.conf._argdict
and self.conf._argdict[
'multithreaded'].value)
or (
'multiprocess' in self.conf._argdict
and self.conf._argdict[
'multiprocess'].value))
and
1057 (
'ATHENA_CORE_NUMBER' not in os.environ)):
1059 msg.warning(
'either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1062 if not self._disableMT:
1063 self._athenaMT, self._athenaConcurrentEvents =
detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1066 if not self._disableMP:
1067 self._athenaMP =
detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1070 if self._onlyMP
and self._athenaMT > 0:
1071 msg.info(
"This configuration does not support MT, falling back to MP")
1072 if self._athenaMP == 0:
1073 self._athenaMP = self._athenaMT
1075 self._athenaConcurrentEvents = 0
1078 if self._onlyMT
and self._athenaMP > 0:
1079 msg.info(
"This configuration does not support MP, using MT")
1080 if self._athenaMT == 0:
1081 self._athenaMT = self._athenaMP
1082 self._athenaConcurrentEvents = self._athenaMP
1087 if not self._disableMP
and expectedEvents < self._athenaMP:
1088 msg.info(
"Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".
format(expectedEvents, self._athenaMP))
1089 self._disableMP =
True
1093 if self.conf.totalExecutorSteps > 1:
1094 for dataType
in output:
1095 if self.conf._dataDictionary[dataType].originalName:
1096 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1098 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1099 self.conf._dataDictionary[dataType].value[0] +=
"_{0}{1}".
format(executorStepSuffix, self.conf.executorStep)
1100 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.conf._dataDictionary[dataType].value[0]))
1103 if self._athenaMP > 0:
1104 self._athenaMPWorkerTopDir =
'athenaMP-workers-{0}-{1}'.
format(self._name, self._substep)
1105 self._athenaMPFileReport =
'athenaMP-outputs-{0}-{1}'.
format(self._name, self._substep)
1106 self._athenaMPEventOrdersFile =
'athenamp_eventorders.txt.{0}'.
format(self._name)
1107 if 'athenaMPUseEventOrders' in self.conf.argdict
and self.conf._argdict[
'athenaMPUseEventOrders'].value
is True:
1108 self._athenaMPReadEventOrders =
True
1110 self._athenaMPReadEventOrders =
False
1112 if (
'athenaMPStrategy' in self.conf.argdict
and
1113 (self.conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None)):
1114 self._athenaMPStrategy = self.conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1116 self._athenaMPStrategy =
'SharedQueue'
1119 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1120 for dataType
in output:
1121 if dataType
in self.conf.argdict[
'athenaMPMergeTargetSize'].value:
1122 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict[
'athenaMPMergeTargetSize'].value[dataType] * 1000000
1123 msg.info(
'Set target merge size for {0} to {1}'.
format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1126 matchedViaGlob =
False
1127 for mtsType, mtsSize
in self.conf.argdict[
'athenaMPMergeTargetSize'].value.items():
1128 if fnmatch(dataType, mtsType):
1129 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1130 msg.info(
'Set target merge size for {0} to {1} from "{2}" glob'.
format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1131 matchedViaGlob =
True
1133 if not matchedViaGlob
and "ALL" in self.conf.argdict[
'athenaMPMergeTargetSize'].value:
1134 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict[
'athenaMPMergeTargetSize'].value[
"ALL"] * 1000000
1135 msg.info(
'Set target merge size for {0} to {1} from "ALL" value'.
format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1140 for dataType
in output:
1141 if self.conf.totalExecutorSteps <= 1:
1142 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1143 if 'eventService' not in self.conf.argdict
or 'eventService' in self.conf.argdict
and self.conf.argdict[
'eventService'].value
is False:
1144 if 'sharedWriter' in self.conf.argdict
and self.conf.argdict[
'sharedWriter'].value:
1145 msg.info(
"SharedWriter: not updating athena output filename for {0}".
format(dataType))
1147 self.conf._dataDictionary[dataType].value[0] +=
"_000"
1148 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.conf._dataDictionary[dataType].value[0]))
1150 self._athenaMPWorkerTopDir = self._athenaMPFileReport =
None
1154 if self._skeleton
or self._skeletonCA:
1156 for dataType
in input:
1157 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1158 outputFiles = dict()
1159 for dataType
in output:
1160 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1164 for dataType, dataArg
in self.conf.dataDictionary.items():
1165 if isinstance(dataArg, list)
and dataArg:
1166 if self.conf.totalExecutorSteps <= 1:
1167 raise ValueError(
'Multiple input arguments provided but only running one substep')
1168 if self.conf.totalExecutorSteps != len(dataArg):
1169 raise ValueError(f
'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1171 if dataArg[self.conf.executorStep].io ==
'input' and nameForFiles
in dataArg[self.conf.executorStep].executor:
1172 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1174 if dataArg.io ==
'input' and nameForFiles
in dataArg.executor:
1175 inputFiles[dataArg.subtype] = dataArg
1177 msg.debug(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
1180 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1181 output = outputFiles)
1186 self._extraMetadata[
'inputs'] =
list(input)
1188 self._extraMetadata[
'outputs'] =
list(output)
1191 dbrelease = dbsetup =
None
1192 if 'DBRelease' in self.conf.argdict:
1193 dbrelease = self.conf.argdict[
'DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1194 if path.islink(dbrelease):
1195 dbrelease = path.realpath(dbrelease)
1198 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1200 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
1201 if not os.access(dbrelease, os.R_OK):
1202 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
1203 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
1204 dbrelease = dbdMatch.group(1)
1208 msg.debug(
'Setting up {0} from {1}'.
format(dbdMatch.group(1), dbrelease))
1209 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1218 self._envUpdate = trfEnv.environmentUpdate()
1219 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1220 self._prepAthenaCommandLine()
1223 super(athenaExecutor, self).preExecute(input, output)
1228 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self._name))
1229 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1230 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self._cmd))