973 def preExecute(self, input = set(), output =
set()):
974 self.setPreExeStart()
975 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.name, input, output))
980 if self._inputDataTypeCountCheck
is None:
981 self._inputDataTypeCountCheck = input
982 for dataType
in self._inputDataTypeCountCheck:
983 if self.conf.dataDictionary[dataType].nentries ==
'UNDEFINED':
986 thisInputEvents = self.conf.dataDictionary[dataType].nentries
987 if thisInputEvents > inputEvents:
988 inputEvents = thisInputEvents
992 if (
'skipEvents' in self.conf.argdict
and
993 self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
994 mySkipEvents = self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
998 if (
'maxEvents' in self.conf.argdict
and
999 self.conf.argdict[
'maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
1000 myMaxEvents = self.conf.argdict[
'maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1005 if (self._inputEventTest
and mySkipEvents > 0
and mySkipEvents >= inputEvents):
1006 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_NOEVENTS'),
1007 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(mySkipEvents, inputEvents, dt))
1011 if (myMaxEvents != -1):
1012 if (self.inData
and next(
iter(self.inData)) ==
'inNULL'):
1013 expectedEvents = myMaxEvents
1015 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1017 expectedEvents = inputEvents-mySkipEvents
1020 msg.info(
'input event count is UNDEFINED, setting expectedEvents to 0')
1026 OSSetupString =
None
1030 legacyThreadingRelease =
False
1031 if 'asetup' in self.conf.argdict:
1032 asetupString = self.conf.argdict[
'asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1037 if asetupString
is not None:
1039 currentOS = os.environ[
'ALRB_USER_PLATFORM']
1040 if legacyOSRelease
and "centos7" not in currentOS:
1041 OSSetupString =
"centos7"
1042 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
1046 if 'runInContainer' in self.conf.argdict:
1047 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1048 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
1049 if OSSetupString
is not None and asetupString
is None:
1050 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_EXEC_SETUP_FAIL'),
1051 '--asetup must be used for the substep which requires --runInContainer')
1054 if self._onlyMPWithRunargs:
1055 for k
in self._onlyMPWithRunargs:
1056 if k
in self.conf._argdict:
1060 if (((
'multithreaded' in self.conf._argdict
and self.conf._argdict[
'multithreaded'].value)
or (
'multiprocess' in self.conf._argdict
and self.conf._argdict[
'multiprocess'].value))
and
1061 (
'ATHENA_CORE_NUMBER' not in os.environ)):
1063 msg.warning(
'either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1066 if not self._disableMT:
1067 self._athenaMT, self._athenaConcurrentEvents =
detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1070 if not self._disableMP:
1071 self._athenaMP =
detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1074 if self._onlyMP
and self._athenaMT > 0:
1075 msg.info(
"This configuration does not support MT, falling back to MP")
1076 if self._athenaMP == 0:
1077 self._athenaMP = self._athenaMT
1079 self._athenaConcurrentEvents = 0
1082 if self._onlyMT
and self._athenaMP > 0:
1083 msg.info(
"This configuration does not support MP, using MT")
1084 if self._athenaMT == 0:
1085 self._athenaMT = self._athenaMP
1086 self._athenaConcurrentEvents = self._athenaMP
1091 if not self._disableMP
and expectedEvents < self._athenaMP
and not self._inData=={
'inNULL'}:
1092 msg.info(
"Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".
format(expectedEvents, self._athenaMP))
1093 self._disableMP =
True
1097 if self.conf.totalExecutorSteps > 1:
1098 for dataType
in output:
1099 if self.conf._dataDictionary[dataType].originalName:
1100 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1102 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1103 self.conf._dataDictionary[dataType].value[0] +=
"_{0}{1}".
format(executorStepSuffix, self.conf.executorStep)
1104 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.conf._dataDictionary[dataType].value[0]))
1107 if self._athenaMP > 0:
1108 self._athenaMPWorkerTopDir =
'athenaMP-workers-{0}-{1}'.
format(self._name, self._substep)
1109 self._athenaMPFileReport =
'athenaMP-outputs-{0}-{1}'.
format(self._name, self._substep)
1110 self._athenaMPEventOrdersFile =
'athenamp_eventorders.txt.{0}'.
format(self._name)
1111 if 'athenaMPUseEventOrders' in self.conf.argdict
and self.conf._argdict[
'athenaMPUseEventOrders'].value
is True:
1112 self._athenaMPReadEventOrders =
True
1114 self._athenaMPReadEventOrders =
False
1116 if (
'athenaMPStrategy' in self.conf.argdict
and
1117 (self.conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None)):
1118 self._athenaMPStrategy = self.conf.argdict[
'athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1120 self._athenaMPStrategy =
'SharedQueue'
1123 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1124 for dataType
in output:
1125 if dataType
in self.conf.argdict[
'athenaMPMergeTargetSize'].value:
1126 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict[
'athenaMPMergeTargetSize'].value[dataType] * 1000000
1127 msg.info(
'Set target merge size for {0} to {1}'.
format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1130 matchedViaGlob =
False
1131 for mtsType, mtsSize
in self.conf.argdict[
'athenaMPMergeTargetSize'].value.items():
1132 if fnmatch(dataType, mtsType):
1133 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1134 msg.info(
'Set target merge size for {0} to {1} from "{2}" glob'.
format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135 matchedViaGlob =
True
1137 if not matchedViaGlob
and "ALL" in self.conf.argdict[
'athenaMPMergeTargetSize'].value:
1138 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict[
'athenaMPMergeTargetSize'].value[
"ALL"] * 1000000
1139 msg.info(
'Set target merge size for {0} to {1} from "ALL" value'.
format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1144 for dataType
in output:
1145 if self.conf.totalExecutorSteps <= 1:
1146 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1147 if 'eventService' not in self.conf.argdict
or 'eventService' in self.conf.argdict
and self.conf.argdict[
'eventService'].value
is False:
1148 if 'sharedWriter' in self.conf.argdict
and self.conf.argdict[
'sharedWriter'].value:
1149 msg.info(
"SharedWriter: not updating athena output filename for {0}".
format(dataType))
1151 self.conf._dataDictionary[dataType].value[0] +=
"_000"
1152 msg.info(
"Updated athena output filename for {0} to {1}".
format(dataType, self.conf._dataDictionary[dataType].value[0]))
1154 self._athenaMPWorkerTopDir = self._athenaMPFileReport =
None
1157 if 'mpi' in self.conf.argdict:
1158 msg.info(
"Running in MPI mode")
1159 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1162 if self._skeleton
or self._skeletonCA:
1164 for dataType
in input:
1165 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1166 outputFiles = dict()
1167 for dataType
in output:
1168 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1172 for dataType, dataArg
in self.conf.dataDictionary.items():
1173 if isinstance(dataArg, list)
and dataArg:
1174 if self.conf.totalExecutorSteps <= 1:
1175 raise ValueError(
'Multiple input arguments provided but only running one substep')
1176 if self.conf.totalExecutorSteps != len(dataArg):
1177 raise ValueError(f
'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1179 if dataArg[self.conf.executorStep].io ==
'input' and nameForFiles
in dataArg[self.conf.executorStep].executor:
1180 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1182 if dataArg.io ==
'input' and nameForFiles
in dataArg.executor:
1183 inputFiles[dataArg.subtype] = dataArg
1185 msg.debug(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
1188 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1189 output = outputFiles)
1194 self._extraMetadata[
'inputs'] =
list(input)
1196 self._extraMetadata[
'outputs'] =
list(output)
1199 dbrelease = dbsetup =
None
1200 if 'DBRelease' in self.conf.argdict:
1201 dbrelease = self.conf.argdict[
'DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1202 if path.islink(dbrelease):
1203 dbrelease = path.realpath(dbrelease)
1206 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1208 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
1209 if not os.access(dbrelease, os.R_OK):
1210 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
1211 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
1212 dbrelease = dbdMatch.group(1)
1216 msg.debug(
'Setting up {0} from {1}'.
format(dbdMatch.group(1), dbrelease))
1217 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1226 self._envUpdate = trfEnv.environmentUpdate()
1227 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1228 self._prepAthenaCommandLine()
1231 super(athenaExecutor, self).preExecute(input, output)
1236 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self._name))
1237 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1238 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self._cmd))