17 from PyJobTransforms.trfUtils import asetupReport, cvmfsDBReleaseCheck, unpackDBRelease, setupDBRelease, lineByLine, asetupReleaseIsOlderThan
18 import PyJobTransforms.trfEnv
as trfEnv
19 import PyJobTransforms.trfExceptions
as trfExceptions
21 import TrigTransform.dbgAnalysis
as dbgStream
22 from TrigTransform.trigTranslate
import getTranslated
as getTranslated
25 import logging, eformat
26 msg = logging.getLogger(
"PyJobTransforms." + __name__)
37 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.name, input, output))
40 if (self._inputEventTest
and 'skipEvents' in self.conf.argdict
and
41 self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
42 msg.debug(
'Will test for events to process')
43 for dataType
in input:
44 inputEvents = self.conf.dataDictionary[dataType].nentries
45 msg.debug(
'Got {} events for {}'.
format(inputEvents, dataType))
46 if not isinstance(inputEvents, int):
47 msg.warning(
'Are input events countable? Got nevents={} so disabling event count check for this input'.
format(inputEvents))
48 elif self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
50 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
53 if self._skeleton
is not None:
55 for dataType
in input:
56 inputFiles[dataType] = self.conf.dataDictionary[dataType]
58 for dataType
in output:
59 outputFiles[dataType] = self.conf.dataDictionary[dataType]
62 for dataType, dataArg
in self.conf.dataDictionary.items():
63 if dataArg.io ==
'input' and self._name
in dataArg.executor:
64 inputFiles[dataArg.subtype] = dataArg
66 msg.info(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
75 self._extraMetadata[
'inputs'] =
list(input)
77 self._extraMetadata[
'outputs'] =
list(output)
81 if 'asetup' in self.conf.argdict:
82 asetupString = self.conf.argdict[
'asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
88 if asetupString
is not None:
90 currentOS = os.environ[
'ALRB_USER_PLATFORM']
91 if legacyOSRelease
and "centos7" not in currentOS:
92 OSSetupString =
"centos7"
93 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
96 if 'runInContainer' in self.conf.argdict:
97 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
98 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
101 dbrelease = dbsetup =
None
102 if 'DBRelease' in self.conf.argdict:
103 dbrelease = self.conf.argdict[
'DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
106 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
108 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
109 if not os.access(dbrelease, os.R_OK):
110 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
111 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
112 dbrelease = dbdMatch.group(1)
116 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
131 if 'athenaHLT' in self._exe:
132 self._cmd.
remove(
'runargs.BSRDOtoRAW.py')
134 optionList =
getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
135 self._cmd.
extend(optionList)
137 if self._isCAEnabled():
138 msg.info(
"Running in CA mode")
140 self._cmd.
append(self._skeletonCA)
141 if 'preExec' in self.conf.argdict:
142 self._cmd.
extend(self.conf.argdict[
'preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
143 msg.info(
'Command adjusted for CA to %s', self._cmd)
145 msg.info(
"Running in legacy mode")
148 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
150 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary[
'BS_RDO'], self.conf.dataDictionary[
'HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
153 if asetupString
is None and dbgAsetupString
is not None:
154 asetupString = dbgAsetupString
155 msg.info(
'Will use asetup string for debug stream analysis %s', dbgAsetupString)
158 currentOS = os.environ[
'ALRB_USER_PLATFORM']
159 if legacyOSRelease
and "centos7" not in currentOS:
160 OSSetupString =
"centos7"
161 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
163 if 'runInContainer' in self.conf.argdict:
164 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
165 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
168 if 'useDB' in self.conf.argdict
and 'DBserver' not in self.conf.argdict
and dbAlias:
169 msg.warn(
"Database alias will be set to %s", dbAlias)
170 self._cmd.
append(
"--db-server " + dbAlias)
172 msg.info(
"Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
176 if 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
178 expectedOutputFileName =
'*_HLTMPPy_*.data'
182 if len(matchedOutputFileNames) > 0:
183 msg.error(f
'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
185 f
'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
188 if OSSetupString
is not None and asetupString
is None:
190 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
193 super(athenaExecutor, self).
preExecute(input, output)
198 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self._name))
199 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
200 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self._cmd))
205 msg.info(
"Before build command line check if reading from DB")
208 removeSkeleton =
False
209 if 'useDB' in self.conf.argdict:
210 removeSkeleton =
True
211 elif 'athenaopts' in self.conf.argdict:
212 v = self.conf.argdict[
'athenaopts'].value
213 if '--use-database' in v
or '-b' in v:
214 removeSkeleton =
True
216 if removeSkeleton
and self._isCAEnabled():
217 msg.error(
'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
219 'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
224 if removeSkeleton
and self._skeleton
is not None:
225 msg.info(
'Read from DB: remove skeleton {0} from command'.
format(self._skeleton))
228 msg.info(
'Not reading from DB: keep skeleton in command')
236 matchedOutputFileNames = []
238 ignoreInputFileNames = []
239 for dataType, dataArg
in self.conf.dataDictionary.items():
240 if dataArg.io ==
'input':
241 ignoreInputFileNames.append(dataArg.value[0])
243 for file
in os.listdir(
'.'):
244 if fnmatch.fnmatch(file, pattern):
245 if file
in ignoreInputFileNames:
246 msg.info(
'Ignoring input file: %s', file)
248 matchedOutputFileNames.append(file)
249 return matchedOutputFileNames
253 msg.info(f
'Merging multiple BS files ({inputFiles}) into {outputFile}')
255 mergeBSFileList =
'RAWFileMerge.list'
257 with open(mergeBSFileList,
'w')
as BSFileList:
258 for fname
in inputFiles:
259 BSFileList.write(f
'{fname}\n')
262 f
'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
265 if not outputFile.endswith(
'._0001.data'):
267 'Output merged BS filename must end in "._0001.data"')
270 outputFile = outputFile.split(
'._0001.data')[0]
274 cmd = f
'file_merging {mergeBSFileList} 0 {outputFile}'
275 msg.info(
'running command for merging (in original asetup env): %s', cmd)
276 mergeBSFailure = subprocess.call(cmd, shell=
True)
277 msg.debug(
'file_merging return code %s', mergeBSFailure)
280 f
'Exception raised when merging BS files using file_merging: {e}')
281 if mergeBSFailure != 0:
282 msg.error(
'file_merging returned error (%s) no merged BS file created', mergeBSFailure)
289 def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
291 outputStreams =
','.
join(
str(stream)
for stream
in streamsList)
292 msg.info(
'Splitting stream %s from BS file', outputStreams)
293 splitStreamFailure = 0
295 cmd =
'trigbs_extractStream.py -s ' + outputStreams +
' ' + allStreamsFileName
296 msg.info(
'running command for splitting (in original asetup env): %s', cmd)
297 splitStreamFailure = subprocess.call(cmd, shell=
True)
298 msg.debug(
'trigbs_extractStream.py splitting return code %s', splitStreamFailure)
301 'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.
format(allStreamsFileName, e))
302 if splitStreamFailure != 0:
303 msg.warning(
'trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
307 expectedStreamFileName =
'*_athenaHLT*.data'
310 if(len(matchedOutputFileName)):
311 self.
_renamefile(matchedOutputFileName[0], splitFileName)
314 msg.error(
'trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
319 msg.info(
'Renaming file from %s to %s', currentFileName, newFileName)
321 os.rename(currentFileName, newFileName)
323 msg.error(
'Exception raised when renaming {0} #to {1}: {2}'.
format(currentFileName, newFileName, e))
325 'Exception raised when renaming {0} #to {1}: {2}'.
format(currentFileName, newFileName, e))
333 log = self._logFileName
334 msg.debug(
'Now scanning logfile {0} for HLTMPPU Child Issues'.
format(log))
343 myGen =
lineByLine(log, substepName=self._substep)
345 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
346 for line, lineCounter
in myGen:
348 if 'Child Issue' in line:
350 signal =
int((re.search(
'signal ([0-9]*)', line)).
group(1))
351 except AttributeError:
354 msg.error(
'Detected issue with HLTChild, setting mother return code to %s', signal)
362 with open(self._logFileName,
'a')
as merged_file:
363 for file
in os.listdir(
'.'):
365 if fnmatch.fnmatch(file,
'athenaHLT:*'):
366 msg.info(
'Merging child log file (%s) into %s', file, self._logFileName)
367 with open(file)
as log_file:
369 merged_file.write(
'### Output from {} ###\n'.
format(file))
371 for line
in log_file:
372 merged_file.write(line)
374 if 'rejected:' in line
and int(line[14]) != 0:
376 rejected +=
int(line[14:])
378 if 'accepted:' in line
and int(line[14]) != 0:
380 accepted +=
int(line[14:])
381 if re.search(
'DFDcmEmuSession.* Communication error', line)
or re.search(
'DFDcmEmuSession.* No new event provided within the timeout limit', line):
382 msg.error(
'Caught DFDcmEmuSession error, aborting job')
385 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
387 dbgStream.getHltDecision(accepted, rejected, self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value[0])
391 'Exception raised when merging log files into {0}: {1}'.
format(self._logFileName, e))
393 msg.info(
"Check for expert-monitoring.root file")
398 expectedFileName =
'expert-monitoring.root'
402 msg.info(
'HLT step failed (with status %s) so skip HIST_HLTMON filename check', self.
_rc)
404 elif 'outputHIST_HLTMONFile' in self.conf.argdict:
407 expectedMotherFileName =
'expert-monitoring-mother.root'
408 if(os.path.isfile(expectedFileName)):
409 msg.info(
'Renaming %s to %s', expectedFileName, expectedMotherFileName)
411 os.rename(expectedFileName, expectedMotherFileName)
414 'Exception raised when renaming {0} to {1}: {2}'.
format(expectedFileName, expectedMotherFileName, e))
416 msg.error(
'HLTMON argument defined but mother %s not created', expectedFileName)
419 expectedWorkerFileName =
'athenaHLT_workers/athenaHLT-01/' + expectedFileName
420 if(os.path.isfile(expectedWorkerFileName)
and os.path.isfile(expectedMotherFileName)):
421 msg.info(
'Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict[
'outputHIST_HLTMONFile'].value[0])
424 cmd =
'hadd ' + self.conf.argdict[
'outputHIST_HLTMONFile'].value[0] +
' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
425 subprocess.call(cmd, shell=
True)
428 'Exception raised when merging worker and mother {0} files to {1}: {2}'.
format(expectedFileName, self.conf.argdict[
'outputHIST_HLTMONFile'].value[0], e))
430 msg.error(
'HLTMON argument defined %s but worker %s not created', self.conf.argdict[
'outputHIST_HLTMONFile'].value[0], expectedFileName)
433 msg.info(
'HLTMON argument not defined so skip %s check', expectedFileName)
435 msg.info(
"Search for created BS files, and rename if single file found")
441 msg.error(
'HLT step failed (with status %s) so skip BS filename check', self.
_rc)
442 elif 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
444 expectedOutputFileName =
'*_HLTMPPy_*.data'
450 if len(matchedOutputFileNames) == 0:
451 msg.error(
'No BS files created with expected name: %s - please check for earlier failures', expectedOutputFileName)
453 f
'No BS files created with expected name: {expectedOutputFileName} - please check for earlier failures')
456 if len(matchedOutputFileNames) == 1:
457 msg.info(
'Single BS file found: will split (if requested) and rename the file')
458 BSFile = matchedOutputFileNames[0]
462 msg.info(
'Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
463 mergedBSFile = matchedOutputFileNames[0].
split(
'._0001.data')[0] +
'.mrg._0001.data'
465 mergeFailed = self.
_mergeBSfiles(matchedOutputFileNames, mergedBSFile)
468 'Did not produce a merged BS file with file_merging')
470 BSFile =
'tmp.BS.mrg'
471 msg.info(f
'Renaming temporary merged BS file to {BSFile}')
475 if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
476 splitFailed = self.
_splitBSfile([
'CostMonitoring'], BSFile, self.conf.dataDictionary[
'DRAW_TRIGCOST'].value[0])
479 'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
482 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
486 if 'BS' in self.conf.dataDictionary:
487 argInDict = self.conf.dataDictionary[
'BS']
489 if 'streamSelection' in self.conf.argdict
and self.conf.argdict[
'streamSelection'].value[0] !=
"All":
490 splitEmpty = self.
_splitBSfile(self.conf.argdict[
'streamSelection'].value, BSFile, argInDict.value[0])
492 msg.info(
'Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
496 cmd_splitFailed =
'trigbs_failedStreamSelection.py ' + BSFile
497 msg.info(
'running command for creating empty file: %s', cmd_splitFailed)
498 subprocess.call(cmd_splitFailed, shell=
True)
501 runnumber = eformat.EventStorage.pickDataReader(BSFile).
runNumber()
502 expectedOutputFileName =
'T0debug.00'+
str(runnumber)+
'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
504 self.
_renamefile(expectedOutputFileName, argInDict.value[0])
506 msg.info(
'Stream "All" requested, so not splitting BS file')
509 msg.info(
'BS output filetype not defined so skip renaming BS')
511 msg.info(
'BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
513 msg.info(
'Now run athenaExecutor:postExecute')
516 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
518 fileNameDbg = self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value
519 dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=
True)
524 msg.info(
"debug stream analysis in postExecute")
527 fileNameDbg = self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value
528 msg.info(
'outputHIST_DEBUGSTREAMMONFile argument is {0}'.
format(fileNameDbg))
530 if(os.path.isfile(fileNameDbg[0])):
532 msg.info(
'Will use file created in PreRun step {0}'.
format(fileNameDbg))
534 msg.info(
'No file created in PreRun step {0}'.
format(fileNameDbg))
537 dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)