976 def preExecute(self, input = set(), output =
set()):
977 self.setPreExeStart()
978 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
979
980
981 inputEvents = 0
982 dt = ""
983 if self._inputDataTypeCountCheck is None:
984 self._inputDataTypeCountCheck = input
985 for dataType in self._inputDataTypeCountCheck:
986 if self.conf.dataDictionary[dataType].nentries == 'UNDEFINED':
987 continue
988
989 thisInputEvents = self.conf.dataDictionary[dataType].nentries
990 if thisInputEvents > inputEvents:
991 inputEvents = thisInputEvents
992 dt = dataType
993
994
995 if ('skipEvents' in self.conf.argdict and
996 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
997 mySkipEvents = self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
998 else:
999 mySkipEvents = 0
1000
1001 if ('maxEvents' in self.conf.argdict and
1002 self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
1003 myMaxEvents = self.conf.argdict['maxEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1004 else:
1005 myMaxEvents = -1
1006
1007
1008 if (self._inputEventTest and mySkipEvents > 0 and mySkipEvents >= inputEvents):
1009 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
1010 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(mySkipEvents, inputEvents, dt))
1011
1012 try:
1013
1014 if (myMaxEvents != -1):
1015 if (self.inData and next(iter(self.inData)) == 'inNULL'):
1016 expectedEvents = myMaxEvents
1017 else:
1018 expectedEvents =
min(inputEvents-mySkipEvents, myMaxEvents)
1019 else:
1020 expectedEvents = inputEvents-mySkipEvents
1021 except TypeError:
1022
1023 msg.info('input event count is UNDEFINED, setting expectedEvents to 0')
1024 expectedEvents = 0
1025
1026
1029 OSSetupString = None
1030
1031
1032 asetupString = None
1033 legacyThreadingRelease = False
1034 if 'asetup' in self.conf.argdict:
1035 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1036 legacyThreadingRelease = asetupReleaseIsOlderThan(asetupString, 22)
1037 else:
1038 msg.info('Asetup report: {0}'.format(asetupReport()))
1039
1040 if asetupString is not None:
1041 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
1042 currentOS = os.environ['ALRB_USER_PLATFORM']
1043 if legacyOSRelease and "centos7" not in currentOS:
1044 OSSetupString = "centos7"
1045 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
1046
1047
1048
1049 if 'runInContainer' in self.conf.argdict:
1050 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1051 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
1052 if OSSetupString is not None and asetupString is None:
1053 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
1054 '--asetup must be used for the substep which requires --runInContainer')
1055
1056
1057 if self._onlyMPWithRunargs:
1058 for k in self._onlyMPWithRunargs:
1059 if k in self.conf._argdict:
1060 self._onlyMP = True
1061
1062
1063 if ((('multithreaded' in self.conf._argdict and self.conf._argdict['multithreaded'].value) or ('multiprocess' in self.conf._argdict and self.conf._argdict['multiprocess'].value)) and
1064 ('ATHENA_CORE_NUMBER' not in os.environ)):
1065
1066 msg.warning('either --multithreaded or --multiprocess argument used but ATHENA_CORE_NUMBER environment not set. Athena will continue in Serial mode')
1067 else:
1068
1069 if not self._disableMT:
1070 self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict, self.name, legacyThreadingRelease)
1071
1072
1073 if not self._disableMP:
1074 self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease)
1075
1076
1077 if self._onlyMP and self._athenaMT > 0:
1078 msg.info("This configuration does not support MT, falling back to MP")
1079 if self._athenaMP == 0:
1080 self._athenaMP = self._athenaMT
1081 self._athenaMT = 0
1082 self._athenaConcurrentEvents = 0
1083
1084
1085 if self._onlyMT and self._athenaMP > 0:
1086 msg.info("This configuration does not support MP, using MT")
1087 if self._athenaMT == 0:
1088 self._athenaMT = self._athenaMP
1089 self._athenaConcurrentEvents = self._athenaMP
1090 self._athenaMP = 0
1091
1092
1093
1094 if not self._disableMP and expectedEvents < self._athenaMP and not self._inData=={'inNULL'}:
1095 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
1096 self._disableMP = True
1097 self._athenaMP = 0
1098
1099
1100 if self.conf.totalExecutorSteps > 1:
1101 for dataType in output:
1102 if self.conf._dataDictionary[dataType].originalName:
1103 self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
1104 else:
1105 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1106 self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
1107 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1108
1109
1110 if self._athenaMP > 0:
1111 self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
1112 self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep)
1113 self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name)
1114 if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True:
1115 self._athenaMPReadEventOrders = True
1116 else:
1117 self._athenaMPReadEventOrders = False
1118
1119 if ('athenaMPStrategy' in self.conf.argdict and
1120 (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)):
1121 self._athenaMPStrategy = self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1122 else:
1123 self._athenaMPStrategy = 'SharedQueue'
1124
1125
1126 if 'athenaMPMergeTargetSize' in self.conf.argdict:
1127 for dataType in output:
1128 if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value:
1129 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000
1130 msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1131 else:
1132
1133 matchedViaGlob = False
1134 for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.items():
1135 if fnmatch(dataType, mtsType):
1136 self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000
1137 msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType))
1138 matchedViaGlob = True
1139 break
1140 if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value:
1141 self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000
1142 msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize))
1143
1144
1145
1146
1147 for dataType in output:
1148 if self.conf.totalExecutorSteps <= 1:
1149 self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
1150 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
1151 if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
1152 msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
1153 else:
1154 self.conf._dataDictionary[dataType].value[0] += "_000"
1155 msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
1156 else:
1157 self._athenaMPWorkerTopDir = self._athenaMPFileReport = None
1158
1159
1160 if 'mpi' in self.conf.argdict:
1161 msg.info("Running in MPI mode")
1162 mpi.setupMPIConfig(output, self.conf.dataDictionary)
1163
1164
1165 if self._skeleton or self._skeletonCA:
1166 inputFiles = dict()
1167 for dataType in input:
1168 inputFiles[dataType] = self.conf.dataDictionary[dataType]
1169 outputFiles = dict()
1170 for dataType in output:
1171 outputFiles[dataType] = self.conf.dataDictionary[dataType]
1172
1173
1174 nameForFiles = commonExecutorStepName(self._name)
1175 for dataType, dataArg in self.conf.dataDictionary.items():
1176 if isinstance(dataArg, list) and dataArg:
1177 if self.conf.totalExecutorSteps <= 1:
1178 raise ValueError('Multiple input arguments provided but only running one substep')
1179 if self.conf.totalExecutorSteps != len(dataArg):
1180 raise ValueError(f'{len(dataArg)} input arguments provided but running {self.conf.totalExecutorSteps} substeps')
1181
1182 if dataArg[self.conf.executorStep].io == 'input' and nameForFiles in dataArg[self.conf.executorStep].executor:
1183 inputFiles[dataArg[self.conf.executorStep].subtype] = dataArg
1184 else:
1185 if dataArg.io == 'input' and nameForFiles in dataArg.executor:
1186 inputFiles[dataArg.subtype] = dataArg
1187
1188 msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
1189
1190
1191 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
1192 output = outputFiles)
1193
1194
1196 if len(input) > 0:
1197 self._extraMetadata['inputs'] = list(input)
1198 if len(output) > 0:
1199 self._extraMetadata['outputs'] = list(output)
1200
1201
1202 dbrelease = dbsetup = None
1203 if 'DBRelease' in self.conf.argdict:
1204 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
1205 if path.islink(dbrelease):
1206 dbrelease = path.realpath(dbrelease)
1207 if dbrelease:
1208
1209 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(dbrelease))
1210 if dbdMatch:
1211 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
1212 if not os.access(dbrelease, os.R_OK):
1213 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
1214 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
1215 dbrelease = dbdMatch.group(1)
1216 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1217 else:
1218
1219 msg.debug('Setting up {0} from {1}'.format(dbdMatch.group(1), dbrelease))
1220 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
1221 if unpacked:
1222
1223 setupDBRelease(dbsetup)
1224
1225 else:
1226 dbsetup = cvmfsDBReleaseCheck(dbrelease)
1227
1228
1229 self._envUpdate = trfEnv.environmentUpdate()
1230 self._envUpdate.setStandardEnvironment(self.conf.argdict, name=self.name, substep=self.substep)
1231 self._prepAthenaCommandLine()
1232
1233
1234 super(athenaExecutor, self).preExecute(input, output)
1235
1236
1237
1238
1239 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
1240 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
1241 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
1242
1243