973 def preExecute(self, input = set(), output =
set()):
974 self.setPreExeStart()
975 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
976
977
978 inputEvents = 0
979 dt = ""
980 if self._inputDataTypeCountCheck is None:
981 self._inputDataTypeCountCheck = input
982 for dataType in self._inputDataTypeCountCheck:
983 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
984 continue
985
986 thisInputEvents = self.conf.dataDictionary[dataType].nentries
987 if thisInputEvents > inputEvents:
988 inputEvents = thisInputEvents
989 dt = dataType
990
991
992 if ('skipEvents' in self.conf.argdict and
993 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
994 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
995 else:
996 mySkipEvents = 0
997
998 if ('maxEvents' in self.conf.argdict and
999 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1000 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1001 else:
1002 myMaxEvents = -1
1003
1004
1005 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1006 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1007 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1008
1009 try:
1010
1011 if (myMaxEvents != -1):
1012 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1013 expectedEvents = myMaxEvents
1014 else:
1015 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1016 else:
1017 expectedEvents = inputEvents-mySkipEvents
1018 except TypeError:
1019
1020 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1021 expectedEvents = 0
1022
1023
1026 OSSetupString = None
1027
1028
1029 asetupString = None
1030 legacyThreadingRelease = False
1031 if 'asetup' in self.conf.argdict:
1032 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1033 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1034 else:
1035 msg.info('Asetup report: {0}'.format(asetupReport()))
1036
1037 if asetupString is not None:
1038 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1039 currentOS = os.environ['ALRB_USER_PLATFORM']
1040 if legacyOSRelease and "centos7" not in currentOS:
1041 OSSetupString = "centos7"
1042 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1043
1044
1045
1046 if 'runInContainer' in self.conf.argdict:
1047 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1048 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1049 if OSSetupString is not None and asetupString is None:
1050 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1051 '--asetup must be used for the substep which requires --runInContainer')
1052
1053
1054 if self._onlyMPWithRunargs:
1055 for k in self._onlyMPWithRunargs:
1056 if k in self.conf._argdict:
1057 self._onlyMP = True
1058
1059
1060 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1061 ('ATHENA_CORE_NUMBER' not in os.environ)):
1062
1063 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1064 else:
1065
1066 if not self._disableMT:
1067 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1068
1069
1070 if not self._disableMP:
1071 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1072
1073
1074 if self._onlyMP and self._athenaMT > 0:
1075 msg.info("This configuration does not support MT, falling back to MP")
1076 if self._athenaMP == 0:
1077 self._athenaMP = self._athenaMT
1078 self._athenaMT = 0
1079 self._athenaConcurrentEvents = 0
1080
1081
1082 if self._onlyMT and self._athenaMP > 0:
1083 msg.info("This configuration does not support MP, using MT")
1084 if self._athenaMT == 0:
1085 self._athenaMT = self._athenaMP
1086 self._athenaConcurrentEvents = self._athenaMP
1087 self._athenaMP = 0
1088
1089
1090
1091 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1092 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1093 self._disableMP = True
1094 self._athenaMP = 0
1095
1096
1097 if self.conf.totalExecutorSteps > 1:
1098 for dataType in output:
1099 if self.conf._dataDictionary[dataType].originalName:
1100 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1101 else:
1102 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1103 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1104 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1105
1106
1107 if self._athenaMP > 0:
1108 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1109 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1110 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1111 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1112 self._athenaMPReadEventOrders = True
1113 else:
1114 self._athenaMPReadEventOrders = False
1115
1116 if ('athenaMPStrategy' in self.conf.argdict and
1117 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1118 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1119 else:
1120 self._athenaMPStrategy = 'SharedQueue'
1121
1122
1123 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1124 for dataType in output:
1125 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1126 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000
1127 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1128 else:
1129
1130 matchedViaGlob = False
1131 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1132 if fnmatch(dataType, mtsType):
1133 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1134 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1135 matchedViaGlob = True
1136 break
1137 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1138 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000
1139 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1140
1141
1142
1143
1144 for dataType in output:
1145 if self.conf.totalExecutorSteps <= 1:
1146 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1147 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1148 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1149 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1150 else:
1151 self.conf._dataDictionary[dataType].value[0] += "_000"
1152 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1153 else:
1154 self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1155
1156
1157 if 'mpi' in self.conf.argdict:
1158 msg.info("Running in MPI mode")
1159 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1160
1161
1162 if self._skeleton or self._skeletonCA:
1163 inputFiles = dict()
1164 for dataType in input:
1165 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1166 outputFiles = dict()
1167 for dataType in output:
1168 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1169
1170
1171 nameForFiles = commonExecutorStepName(self._name)
1172 for dataType, dataArg in self.conf.dataDictionary.items():
1173 if isinstance(dataArg, list) and dataArg:
1174 if self.conf.totalExecutorSteps <= 1:
1175 raise ValueError('Multiple input arguments provided but only running one substep')
1176 if self.conf.totalExecutorSteps != len(dataArg):
1177 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1178
1179 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1180 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1181 else:
1182 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1183 inputFiles[dataArg.subtype] = dataArg
1184
1185 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1186
1187
1188 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1189 output = outputFiles)
1190
1191
1193 if len(input) > 0:
1194 self._extraMetadata['inputs'] = list(input)
1195 if len(output) > 0:
1196 self._extraMetadata['outputs'] = list(output)
1197
1198
1199 dbrelease = dbsetup = None
1200 if 'DBRelease' in self.conf.argdict:
1201 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1202 if path.islink(dbrelease):
1203 dbrelease = path.realpath(dbrelease)
1204 if dbrelease:
1205
1206 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1207 if dbdMatch:
1208 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1209 if not os.access(dbrelease, os.R_OK):
1210 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1211 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1212 dbrelease = dbdMatch.group(1)
1213 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1214 else:
1215
1216 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1217 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1218 if unpacked:
1219
1220 setupDBRelease(dbsetup)
1221
1222 else:
1223 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1224
1225
1226 self._envUpdate = trfEnv.environmentUpdate()
1227 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1228 self._prepAthenaCommandLine()
1229
1230
1231 super(athenaExecutor, self).preExecute(input, output)
1232
1233
1234
1235
1236 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1237 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1238 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1239
1240