40 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.
name, input, output))
43 self.
_exe =
'setsid ' + self.conf.argdict[
'trigExe'].value
46 if (self._inputEventTest
and 'skipEvents' in self.conf.argdict
and
47 self.conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor)
is not None):
48 msg.debug(
'Will test for events to process')
49 for dataType
in input:
50 inputEvents = self.conf.dataDictionary[dataType].nentries
51 msg.debug(
'Got {} events for {}'.format(inputEvents, dataType))
52 if not isinstance(inputEvents, int):
53 msg.warning(
'Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
54 elif self.conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor) >= inputEvents:
56 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict[
'skipEvents'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor), inputEvents, dataType))
61 for dataType
in input:
62 inputFiles[dataType] = self.conf.dataDictionary[dataType]
64 for dataType
in output:
65 outputFiles[dataType] = self.conf.dataDictionary[dataType]
68 for dataType, dataArg
in self.conf.dataDictionary.items():
69 if dataArg.io ==
'input' and self.
_name in dataArg.executor:
70 inputFiles[dataArg.subtype] = dataArg
72 msg.info(
'Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
81 self._extraMetadata[
'inputs'] = list(input)
83 self._extraMetadata[
'outputs'] = list(output)
87 if 'asetup' in self.conf.argdict:
88 asetupString = self.conf.argdict[
'asetup'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor)
90 msg.info(
'Asetup report: {0}'.format(asetupReport()))
94 if asetupString
is not None:
95 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
96 currentOS = os.environ[
'ALRB_USER_PLATFORM']
97 if legacyOSRelease
and "centos7" not in currentOS:
98 OSSetupString =
"centos7"
99 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.format(self.
_substep, OSSetupString))
102 if 'runInContainer' in self.conf.argdict:
103 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor)
104 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.format(self.
_substep, OSSetupString))
107 dbrelease = dbsetup =
None
108 if 'DBRelease' in self.conf.argdict:
109 dbrelease = self.conf.argdict[
'DBRelease'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor)
112 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
114 msg.debug(
'DBRelease setting {0} matches classic tarball file'.format(dbrelease))
115 if not os.access(dbrelease, os.R_OK):
116 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
117 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
118 dbrelease = dbdMatch.group(1)
119 dbsetup = cvmfsDBReleaseCheck(dbrelease)
122 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
125 setupDBRelease(dbsetup)
128 dbsetup = cvmfsDBReleaseCheck(dbrelease)
137 if 'athenaHLT' in self.
_exe or 'athenaEF' in self.
_exe:
138 self.
_cmd.remove(
'runargs.BSRDOtoRAW.py')
140 optionList = getTranslated(self.conf.argdict, name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor, output = outputFiles)
141 self.
_cmd.extend(optionList)
143 if self._isCAEnabled():
144 msg.info(
"Running in CA mode")
147 if 'preExec' in self.conf.argdict:
148 self.
_cmd.extend(self.conf.argdict[
'preExec'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor))
149 msg.info(
'Command adjusted for CA to %s', self.
_cmd)
151 msg.info(
"Running in legacy mode")
154 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
156 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary[
'BS_RDO'], self.conf.dataDictionary[
'HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
159 if asetupString
is None and dbgAsetupString
is not None:
160 asetupString = dbgAsetupString
161 msg.info(
'Will use asetup string for debug stream analysis %s', dbgAsetupString)
163 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
164 currentOS = os.environ[
'ALRB_USER_PLATFORM']
165 if legacyOSRelease
and "centos7" not in currentOS:
166 OSSetupString =
"centos7"
167 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.format(self.
_substep, OSSetupString))
169 if 'runInContainer' in self.conf.argdict:
170 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self.
_name, substep=self.
_substep, first=self.conf.firstExecutor)
171 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.format(self.
_substep, OSSetupString))
174 if 'useDB' in self.conf.argdict
and 'DBserver' not in self.conf.argdict
and dbAlias:
175 msg.warn(
"Database alias will be set to %s", dbAlias)
176 self.
_cmd.append(
"--db-server " + dbAlias)
178 msg.info(
"Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
182 if 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
186 if len(matchedOutputFileNames) > 0:
187 msg.error(f
'Directoy already contains files with expected output name format ({self.expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
189 f
'Directory already contains files with expected output name format {self.expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
192 if OSSetupString
is not None and asetupString
is None:
194 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
197 super(athenaExecutor, self).
preExecute(input, output)
202 msg.info(
'Now writing wrapper for substep executor {0}'.format(self.
_name))
203 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
204 msg.info(
'Athena will be executed in a subshell via {0}'.format(self.
_cmd))
293 def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
295 outputStreams =
','.join(str(stream)
for stream
in streamsList)
296 msg.info(
'Splitting stream %s from BS file', outputStreams)
297 splitStreamFailure = 0
299 cmd = f
'trigbs_extractStream.py -s {outputStreams} {allStreamsFileName}'
300 msg.info(
'running command for splitting (in original asetup env): %s', cmd)
301 splitStreamFailure = subprocess.call(cmd, shell=
True)
302 msg.debug(
'trigbs_extractStream.py splitting return code %s', splitStreamFailure)
305 'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.format(allStreamsFileName, e))
306 if splitStreamFailure != 0:
307 msg.warning(
'trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
311 streamName = outputStreams
if len(streamsList)==1
else 'accepted'
312 expectedStreamFileName = f
'*_{streamName}.*.RAW._*.data'
315 if(len(matchedOutputFileName)):
316 self.
_renamefile(matchedOutputFileName[0], splitFileName)
319 msg.error(
'trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
339 msg.debug(
'Now scanning logfile {0} for HLTMPPU Child Issues'.format(log))
348 myGen = lineByLine(log, substepName=self.
_substep)
350 msg.error(
'Failed to open transform logfile {0}: {1:s}'.format(log, e))
351 for line, lineCounter
in myGen:
353 if 'Child Issue' in line:
355 signal = int((re.search(
'signal ([0-9]*)', line)).group(1))
356 except AttributeError:
359 msg.error(
'Detected issue with HLTChild, setting mother return code to %s', signal)
368 for file
in os.listdir(
'.'):
370 if fnmatch.fnmatch(file,
'athenaHLT:*'):
371 msg.info(
'Merging child log file (%s) into %s', file, self.
_logFileName)
372 with open(file)
as log_file:
374 merged_file.write(
'### Output from {} ###\n'.format(file))
376 for line
in log_file:
377 merged_file.write(line)
379 if 'rejected:' in line
and int(line[14]) != 0:
381 rejected += int(line[14:])
383 if 'accepted:' in line
and int(line[14]) != 0:
385 accepted += int(line[14:])
386 if re.search(
'DFDcmEmuSession.* Communication error', line)
or re.search(
'DFDcmEmuSession.* No new event provided within the timeout limit', line):
387 msg.error(
'Caught DFDcmEmuSession error, aborting job')
390 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
392 dbgStream.getHltDecision(accepted, rejected, self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value[0])
396 'Exception raised when merging log files into {0}: {1}'.format(self.
_logFileName, e))
398 msg.info(
"Check for expert-monitoring.root file")
403 expectedFileName =
'expert-monitoring.root'
407 msg.info(
'HLT step failed (with status %s) so skip HIST_HLTMON filename check', self.
_rc)
409 elif 'outputHIST_HLTMONFile' in self.conf.argdict:
412 expectedMotherFileName =
'expert-monitoring-mother.root'
413 if(os.path.isfile(expectedFileName)):
414 msg.info(
'Renaming %s to %s', expectedFileName, expectedMotherFileName)
416 os.rename(expectedFileName, expectedMotherFileName)
419 'Exception raised when renaming {0} to {1}: {2}'.format(expectedFileName, expectedMotherFileName, e))
421 msg.error(
'HLTMON argument defined but mother %s not created', expectedFileName)
424 expectedWorkerFileName =
'athenaHLT_workers/athenaHLT-01/' + expectedFileName
425 if(os.path.isfile(expectedWorkerFileName)
and os.path.isfile(expectedMotherFileName)):
426 msg.info(
'Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict[
'outputHIST_HLTMONFile'].value[0])
429 cmd =
'hadd ' + self.conf.argdict[
'outputHIST_HLTMONFile'].value[0] +
' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
430 subprocess.call(cmd, shell=
True)
433 'Exception raised when merging worker and mother {0} files to {1}: {2}'.format(expectedFileName, self.conf.argdict[
'outputHIST_HLTMONFile'].value[0], e))
435 msg.error(
'HLTMON argument defined %s but worker %s not created', self.conf.argdict[
'outputHIST_HLTMONFile'].value[0], expectedFileName)
438 msg.info(
'HLTMON argument not defined so skip %s check', expectedFileName)
440 msg.info(
"Search for created BS files, and rename if single file found")
446 msg.error(
'HLT step failed (with status %s) so skip BS filename check', self.
_rc)
447 elif 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
453 if len(matchedOutputFileNames) == 0:
454 msg.error(
'No BS files created with expected name: %s - please check for earlier failures', self.
expectedOutputFileName)
456 f
'No BS files created with expected name: {self.expectedOutputFileName} - please check for earlier failures')
459 if len(matchedOutputFileNames) == 1:
460 msg.info(
'Single BS file found: will split (if requested) and rename the file')
461 BSFile = matchedOutputFileNames[0]
465 msg.info(
'Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
466 mergedBSFile = matchedOutputFileNames[0].
split(
'._0001.data')[0] +
'.mrg._0001.data'
468 mergeFailed = self.
_mergeBSfiles(matchedOutputFileNames, mergedBSFile)
471 'Did not produce a merged BS file with file_merging')
473 BSFile =
'tmp.BS.mrg'
474 msg.info(f
'Renaming temporary merged BS file to {BSFile}')
478 if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
479 splitFailed = self.
_splitBSfile([
'CostMonitoring'], BSFile, self.conf.dataDictionary[
'DRAW_TRIGCOST'].value[0])
482 'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
485 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
489 if 'BS' in self.conf.dataDictionary:
490 argInDict = self.conf.dataDictionary[
'BS']
492 if 'streamSelection' in self.conf.argdict
and self.conf.argdict[
'streamSelection'].value[0] !=
"All":
493 splitEmpty = self.
_splitBSfile(self.conf.argdict[
'streamSelection'].value, BSFile, argInDict.value[0])
495 msg.info(
'Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
499 cmd_splitFailed =
'trigbs_failedStreamSelection.py ' + BSFile
500 msg.info(
'running command for creating empty file: %s', cmd_splitFailed)
501 subprocess.call(cmd_splitFailed, shell=
True)
504 runnumber = eformat.EventStorage.pickDataReader(BSFile).runNumber()
505 expectedOutputFileName =
'T0debug.00'+str(runnumber)+
'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
507 self.
_renamefile(expectedOutputFileName, argInDict.value[0])
509 msg.info(
'Stream "All" requested, so not splitting BS file')
512 msg.info(
'BS output filetype not defined so skip renaming BS')
514 msg.info(
'BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
516 msg.info(
'Now run athenaExecutor:postExecute')
519 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
521 fileNameDbg = self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value
522 dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=
True)