979 def preExecute(self, input = set(), output =
set()):
980 self.setPreExeStart()
981 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
982
983
984 inputEvents = 0
985 dt = ""
986 if self._inputDataTypeCountCheck is None:
987 self._inputDataTypeCountCheck = input
988 for dataType in self._inputDataTypeCountCheck:
989 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
990 continue
991
992 thisInputEvents = self.conf.dataDictionary[dataType].nentries
993 if thisInputEvents > inputEvents:
994 inputEvents = thisInputEvents
995 dt = dataType
996
997
998 if ('skipEvents' in self.conf.argdict and
999 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001 else:
1002 mySkipEvents = 0
1003
1004 if ('maxEvents' in self.conf.argdict and
1005 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1006 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1007 else:
1008 myMaxEvents = -1
1009
1010
1011 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1012 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1013 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1014
1015 try:
1016
1017 if (myMaxEvents != -1):
1018 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1019 expectedEvents = myMaxEvents
1020 else:
1021 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1022 else:
1023 expectedEvents = inputEvents-mySkipEvents
1024 except TypeError:
1025
1026 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1027 expectedEvents = 0
1028
1029
1032 OSSetupString = None
1033
1034
1035 asetupString = None
1036 legacyThreadingRelease = False
1037 if 'asetup' in self.conf.argdict:
1038 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1039 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1040 else:
1041 msg.info('Asetup report: {0}'.format(asetupReport()))
1042
1043 if asetupString is not None:
1044 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1045 currentOS = os.environ['ALRB_USER_PLATFORM']
1046 if legacyOSRelease and "centos7" not in currentOS:
1047 OSSetupString = "centos7"
1048 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1049
1050
1051
1052 if 'runInContainer' in self.conf.argdict:
1053 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1054 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1055 if OSSetupString is not None and asetupString is None:
1056 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1057 '--asetup must be used for the substep which requires --runInContainer')
1058
1059
1060 if self._onlyMPWithRunargs:
1061 for k in self._onlyMPWithRunargs:
1062 if k in self.conf._argdict:
1063 self._onlyMP = True
1064
1065
1066 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1067 ('ATHENA_CORE_NUMBER' not in os.environ)):
1068
1069 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1070 else:
1071
1072 if not self._disableMT:
1073 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1074
1075
1076 if not self._disableMP:
1077 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1078
1079
1080 if self._onlyMP and self._athenaMT > 0:
1081 msg.info("This configuration does not support MT, falling back to MP")
1082 if self._athenaMP == 0:
1083 self._athenaMP = self._athenaMT
1084 self._athenaMT = 0
1085 self._athenaConcurrentEvents = 0
1086
1087
1088 if self._onlyMT and self._athenaMP > 0:
1089 msg.info("This configuration does not support MP, using MT")
1090 if self._athenaMT == 0:
1091 self._athenaMT = self._athenaMP
1092 self._athenaConcurrentEvents = self._athenaMP
1093 self._athenaMP = 0
1094
1095
1096
1097 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1098 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1099 self._disableMP = True
1100 self._athenaMP = 0
1101
1102
1103 if self.conf.totalExecutorSteps > 1:
1104 for dataType in output:
1105 if self.conf._dataDictionary[dataType].originalName:
1106 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1107 else:
1108 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1109 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1110 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1111
1112
1113 if self._athenaMP > 0:
1114 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1115 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1116 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1117 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1118 self._athenaMPReadEventOrders = True
1119 else:
1120 self._athenaMPReadEventOrders = False
1121
1122 if ('athenaMPStrategy' in self.conf.argdict and
1123 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1124 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1125 else:
1126 self._athenaMPStrategy = 'SharedQueue'
1127
1128
1129 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1130 for dataType in output:
1131 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1132 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000
1133 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1134 else:
1135
1136 matchedViaGlob = False
1137 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1138 if fnmatch(dataType, mtsType):
1139 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1140 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1141 matchedViaGlob = True
1142 break
1143 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1144 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000
1145 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1146
1147
1148
1149
1150 for dataType in output:
1151 if self.conf.totalExecutorSteps <= 1:
1152 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1153 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1154 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1155 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1156 else:
1157 self.conf._dataDictionary[dataType].value[0] += "_000"
1158 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1159 else:
1160 self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1161
1162
1163 if 'mpi' in self.conf.argdict:
1164 msg.info("Running in MPI mode")
1165 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1166
1167
1168 if self._skeleton or self._skeletonCA:
1169 inputFiles = dict()
1170 for dataType in input:
1171 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1172 outputFiles = dict()
1173 for dataType in output:
1174 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1175
1176
1177 nameForFiles = commonExecutorStepName(self._name)
1178 for dataType, dataArg in self.conf.dataDictionary.items():
1179 if isinstance(dataArg, list) and dataArg:
1180 if self.conf.totalExecutorSteps <= 1:
1181 raise ValueError('Multiple input arguments provided but only running one substep')
1182 if self.conf.totalExecutorSteps != len(dataArg):
1183 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1184
1185 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1186 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1187 else:
1188 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1189 inputFiles[dataArg.subtype] = dataArg
1190
1191 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1192
1193
1194 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1195 output = outputFiles)
1196
1197
1199 if len(input) > 0:
1200 self._extraMetadata['inputs'] = list(input)
1201 if len(output) > 0:
1202 self._extraMetadata['outputs'] = list(output)
1203
1204
1205 dbrelease = dbsetup = None
1206 if 'DBRelease' in self.conf.argdict:
1207 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1208 if path.islink(dbrelease):
1209 dbrelease = path.realpath(dbrelease)
1210 if dbrelease:
1211
1212 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1213 if dbdMatch:
1214 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1215 if not os.access(dbrelease, os.R_OK):
1216 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1217 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1218 dbrelease = dbdMatch.group(1)
1219 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1220 else:
1221
1222 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1223 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1224 if unpacked:
1225
1226 setupDBRelease(dbsetup)
1227
1228 else:
1229 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1230
1231
1232 self._envUpdate = trfEnv.environmentUpdate()
1233 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1234 self._prepAthenaCommandLine()
1235
1236
1237 super(athenaExecutor, self).preExecute(input, output)
1238
1239
1240
1241
1242 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1243 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1244 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1245
1246