18 from PyJobTransforms.trfUtils import asetupReport, cvmfsDBReleaseCheck, unpackDBRelease, setupDBRelease, lineByLine, asetupReleaseIsOlderThan
19 import PyJobTransforms.trfEnv
as trfEnv
20 import PyJobTransforms.trfExceptions
as trfExceptions
22 import TrigTransform.dbgAnalysis
as dbgStream
23 from TrigTransform.trigTranslate
import getTranslated
as getTranslated
26 import logging, eformat
27 msg = logging.getLogger(
"PyJobTransforms." + __name__)
38 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.name, input, output))
41 if (self._inputEventTest
and 'skipEvents' in self.conf.argdict
and
42 self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
43 msg.debug(
'Will test for events to process')
44 for dataType
in input:
45 inputEvents = self.conf.dataDictionary[dataType].nentries
46 msg.debug(
'Got {0} events for {1}'.
format(inputEvents, dataType))
47 if not isinstance(inputEvents, six.integer_types):
48 msg.warning(
'Are input events countable? Got nevents={0} so disabling event count check for this input'.
format(inputEvents))
49 elif self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
51 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
54 if self._skeleton
is not None:
56 for dataType
in input:
57 inputFiles[dataType] = self.conf.dataDictionary[dataType]
59 for dataType
in output:
60 outputFiles[dataType] = self.conf.dataDictionary[dataType]
63 for dataType, dataArg
in self.conf.dataDictionary.items():
64 if dataArg.io ==
'input' and self._name
in dataArg.executor:
65 inputFiles[dataArg.subtype] = dataArg
67 msg.info(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
76 self._extraMetadata[
'inputs'] =
list(input)
78 self._extraMetadata[
'outputs'] =
list(output)
82 if 'asetup' in self.conf.argdict:
83 asetupString = self.conf.argdict[
'asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
89 if asetupString
is not None:
91 currentOS = os.environ[
'ALRB_USER_PLATFORM']
92 if legacyOSRelease
and "centos7" not in currentOS:
93 OSSetupString =
"centos7"
94 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
97 if 'runInContainer' in self.conf.argdict:
98 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
99 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
102 dbrelease = dbsetup =
None
103 if 'DBRelease' in self.conf.argdict:
104 dbrelease = self.conf.argdict[
'DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
107 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
109 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
110 if not os.access(dbrelease, os.R_OK):
111 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
112 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
113 dbrelease = dbdMatch.group(1)
117 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
132 if 'athenaHLT' in self._exe:
133 self._cmd.
remove(
'runargs.BSRDOtoRAW.py')
135 optionList =
getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
136 self._cmd.
extend(optionList)
138 if self._isCAEnabled():
139 msg.info(
"Running in CA mode")
141 self._cmd.
append(self._skeletonCA)
142 if 'preExec' in self.conf.argdict:
143 self._cmd.
extend(self.conf.argdict[
'preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
144 msg.info(
'Command adjusted for CA to %s', self._cmd)
146 msg.info(
"Running in legacy mode")
149 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
151 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary[
'BS_RDO'], self.conf.dataDictionary[
'HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
154 if asetupString
is None and dbgAsetupString
is not None:
155 asetupString = dbgAsetupString
156 msg.info(
'Will use asetup string for debug stream analysis %s', dbgAsetupString)
159 currentOS = os.environ[
'ALRB_USER_PLATFORM']
160 if legacyOSRelease
and "centos7" not in currentOS:
161 OSSetupString =
"centos7"
162 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
164 if 'runInContainer' in self.conf.argdict:
165 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
166 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
169 if 'useDB' in self.conf.argdict
and 'DBserver' not in self.conf.argdict
and dbAlias:
170 msg.warn(
"Database alias will be set to %s", dbAlias)
171 self._cmd.
append(
"--db-server " + dbAlias)
173 msg.info(
"Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
177 if 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
179 expectedOutputFileName =
'*_HLTMPPy_*.data'
183 if len(matchedOutputFileNames) > 0:
184 msg.error(f
'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
186 f
'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
189 if OSSetupString
is not None and asetupString
is None:
191 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
194 super(athenaExecutor, self).
preExecute(input, output)
199 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self._name))
200 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
201 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self._cmd))
206 msg.info(
"Before build command line check if reading from DB")
209 removeSkeleton =
False
210 if 'useDB' in self.conf.argdict:
211 removeSkeleton =
True
212 elif 'athenaopts' in self.conf.argdict:
213 v = self.conf.argdict[
'athenaopts'].value
214 if '--use-database' in v
or '-b' in v:
215 removeSkeleton =
True
217 if removeSkeleton
and self._isCAEnabled():
218 msg.error(
'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
220 'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
225 if removeSkeleton
and self._skeleton
is not None:
226 msg.info(
'Read from DB: remove skeleton {0} from command'.
format(self._skeleton))
229 msg.info(
'Not reading from DB: keep skeleton in command')
237 matchedOutputFileNames = []
239 ignoreInputFileNames = []
240 for dataType, dataArg
in self.conf.dataDictionary.items():
241 if dataArg.io ==
'input':
242 ignoreInputFileNames.append(dataArg.value[0])
244 for file
in os.listdir(
'.'):
245 if fnmatch.fnmatch(file, pattern):
246 if file
in ignoreInputFileNames:
247 msg.info(
'Ignoring input file: %s', file)
249 matchedOutputFileNames.append(file)
250 return matchedOutputFileNames
254 msg.info(f
'Merging multiple BS files ({inputFiles}) into {outputFile}')
256 mergeBSFileList =
'RAWFileMerge.list'
258 with open(mergeBSFileList,
'w')
as BSFileList:
259 for fname
in inputFiles:
260 BSFileList.write(f
'{fname}\n')
263 f
'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
266 if not outputFile.endswith(
'._0001.data'):
268 'Output merged BS filename must end in "._0001.data"')
271 outputFile = outputFile.split(
'._0001.data')[0]
275 cmd = f
'file_merging {mergeBSFileList} 0 {outputFile}'
276 msg.info(
'running command for merging (in original asetup env): %s', cmd)
277 mergeBSFailure = subprocess.call(cmd, shell=
True)
278 msg.debug(
'file_merging return code %s', mergeBSFailure)
281 f
'Exception raised when merging BS files using file_merging: {e}')
282 if mergeBSFailure != 0:
283 msg.error(
'file_merging returned error (%s) no merged BS file created', mergeBSFailure)
290 def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
292 outputStreams =
','.
join(
str(stream)
for stream
in streamsList)
293 msg.info(
'Splitting stream %s from BS file', outputStreams)
294 splitStreamFailure = 0
296 cmd =
'trigbs_extractStream.py -s ' + outputStreams +
' ' + allStreamsFileName
297 msg.info(
'running command for splitting (in original asetup env): %s', cmd)
298 splitStreamFailure = subprocess.call(cmd, shell=
True)
299 msg.debug(
'trigbs_extractStream.py splitting return code %s', splitStreamFailure)
302 'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.
format(allStreamsFileName, e))
303 if splitStreamFailure != 0:
304 msg.warning(
'trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
308 expectedStreamFileName =
'*_athenaHLT*.data'
311 if(len(matchedOutputFileName)):
312 self.
_renamefile(matchedOutputFileName[0], splitFileName)
315 msg.error(
'trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
320 msg.info(
'Renaming file from %s to %s', currentFileName, newFileName)
322 os.rename(currentFileName, newFileName)
324 msg.error(
'Exception raised when renaming {0} #to {1}: {2}'.
format(currentFileName, newFileName, e))
326 'Exception raised when renaming {0} #to {1}: {2}'.
format(currentFileName, newFileName, e))
334 log = self._logFileName
335 msg.debug(
'Now scanning logfile {0} for HLTMPPU Child Issues'.
format(log))
344 myGen =
lineByLine(log, substepName=self._substep)
346 msg.error(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
347 for line, lineCounter
in myGen:
349 if 'Child Issue' in line:
351 signal =
int((re.search(
'signal ([0-9]*)', line)).
group(1))
352 except AttributeError:
355 msg.error(
'Detected issue with HLTChild, setting mother return code to %s', signal)
363 with open(self._logFileName,
'a')
as merged_file:
364 for file
in os.listdir(
'.'):
366 if fnmatch.fnmatch(file,
'athenaHLT:*'):
367 msg.info(
'Merging child log file (%s) into %s', file, self._logFileName)
368 with open(file)
as log_file:
370 merged_file.write(
'### Output from {} ###\n'.
format(file))
372 for line
in log_file:
373 merged_file.write(line)
375 if 'rejected:' in line
and int(line[14]) != 0:
377 rejected +=
int(line[14:])
379 if 'accepted:' in line
and int(line[14]) != 0:
381 accepted +=
int(line[14:])
383 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
385 dbgStream.getHltDecision(accepted, rejected, self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value[0])
389 'Exception raised when merging log files into {0}: {1}'.
format(self._logFileName, e))
391 msg.info(
"Check for expert-monitoring.root file")
396 expectedFileName =
'expert-monitoring.root'
400 msg.info(
'HLT step failed (with status %s) so skip HIST_HLTMON filename check', self.
_rc)
402 elif 'outputHIST_HLTMONFile' in self.conf.argdict:
405 expectedMotherFileName =
'expert-monitoring-mother.root'
406 if(os.path.isfile(expectedFileName)):
407 msg.info(
'Renaming %s to %s', expectedFileName, expectedMotherFileName)
409 os.rename(expectedFileName, expectedMotherFileName)
412 'Exception raised when renaming {0} to {1}: {2}'.
format(expectedFileName, expectedMotherFileName, e))
414 msg.error(
'HLTMON argument defined but mother %s not created', expectedFileName)
417 expectedWorkerFileName =
'athenaHLT_workers/athenaHLT-01/' + expectedFileName
418 if(os.path.isfile(expectedWorkerFileName)
and os.path.isfile(expectedMotherFileName)):
419 msg.info(
'Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict[
'outputHIST_HLTMONFile'].value[0])
422 cmd =
'hadd ' + self.conf.argdict[
'outputHIST_HLTMONFile'].value[0] +
' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
423 subprocess.call(cmd, shell=
True)
426 'Exception raised when merging worker and mother {0} files to {1}: {2}'.
format(expectedFileName, self.conf.argdict[
'outputHIST_HLTMONFile'].value[0], e))
428 msg.error(
'HLTMON argument defined %s but worker %s not created', self.conf.argdict[
'outputHIST_HLTMONFile'].value[0], expectedFileName)
431 msg.info(
'HLTMON argument not defined so skip %s check', expectedFileName)
433 msg.info(
"Search for created BS files, and rename if single file found")
439 msg.error(
'HLT step failed (with status %s) so skip BS filename check', self.
_rc)
440 elif 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
442 expectedOutputFileName =
'*_HLTMPPy_*.data'
448 if len(matchedOutputFileNames) == 0:
449 msg.error(
'No BS files created with expected name: %s - please check for earlier failures', expectedOutputFileName)
451 f
'No BS files created with expected name: {expectedOutputFileName} - please check for earlier failures')
454 if len(matchedOutputFileNames) == 1:
455 msg.info(
'Single BS file found: will split (if requested) and rename the file')
456 BSFile = matchedOutputFileNames[0]
460 msg.info(
'Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
461 mergedBSFile = matchedOutputFileNames[0].
split(
'._0001.data')[0] +
'.mrg._0001.data'
463 mergeFailed = self.
_mergeBSfiles(matchedOutputFileNames, mergedBSFile)
466 'Did not produce a merged BS file with file_merging')
468 BSFile =
'tmp.BS.mrg'
469 msg.info(f
'Renaming temporary merged BS file to {BSFile}')
473 if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
474 splitFailed = self.
_splitBSfile([
'CostMonitoring'], BSFile, self.conf.dataDictionary[
'DRAW_TRIGCOST'].value[0])
477 'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
480 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
484 if 'BS' in self.conf.dataDictionary:
485 argInDict = self.conf.dataDictionary[
'BS']
487 if 'streamSelection' in self.conf.argdict
and self.conf.argdict[
'streamSelection'].value[0] !=
"All":
488 splitEmpty = self.
_splitBSfile(self.conf.argdict[
'streamSelection'].value, BSFile, argInDict.value[0])
490 msg.info(
'Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
494 cmd_splitFailed =
'trigbs_failedStreamSelection.py ' + BSFile
495 msg.info(
'running command for creating empty file: %s', cmd_splitFailed)
496 subprocess.call(cmd_splitFailed, shell=
True)
499 runnumber = eformat.EventStorage.pickDataReader(BSFile).
runNumber()
500 expectedOutputFileName =
'T0debug.00'+
str(runnumber)+
'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
502 self.
_renamefile(expectedOutputFileName, argInDict.value[0])
504 msg.info(
'Stream "All" requested, so not splitting BS file')
507 msg.info(
'BS output filetype not defined so skip renaming BS')
509 msg.info(
'BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
511 msg.info(
'Now run athenaExecutor:postExecute')
514 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
516 fileNameDbg = self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value
517 dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=
True)
522 msg.info(
"debug stream analysis in postExecute")
525 fileNameDbg = self.conf.argdict[
"outputHIST_DEBUGSTREAMMONFile"].value
526 msg.info(
'outputHIST_DEBUGSTREAMMONFile argument is {0}'.
format(fileNameDbg))
528 if(os.path.isfile(fileNameDbg[0])):
530 msg.info(
'Will use file created in PreRun step {0}'.
format(fileNameDbg))
532 msg.info(
'No file created in PreRun step {0}'.
format(fileNameDbg))
535 dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)