ATLAS Offline Software
Loading...
Searching...
No Matches
trigRecoExe.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
4
5# @brief: Trigger executor to call base transforms
6# @details: Based on athenaExecutor with some modifications
7# @author: Mark Stockton
8
9import os
10import fnmatch
11import re
12import subprocess
13
14from PyJobTransforms.trfExe import athenaExecutor
15
16# imports for preExecute
17from PyJobTransforms.trfUtils import asetupReport, cvmfsDBReleaseCheck, unpackDBRelease, setupDBRelease, lineByLine, asetupReleaseIsOlderThan
18import PyJobTransforms.trfEnv as trfEnv
19import PyJobTransforms.trfExceptions as trfExceptions
20from PyJobTransforms.trfExitCodes import trfExit as trfExit
21import TrigTransform.dbgAnalysis as dbgStream
22from TrigTransform.trigTranslate import getTranslated as getTranslated
23
24# Setup logging here
25import logging, eformat
26msg = logging.getLogger("PyJobTransforms." + __name__)
27
28# Trig_reco_tf.py executor for BS-BS step (aka running the trigger)
29# used to setup input files/arguments and change output filenames
30class trigRecoExecutor(athenaExecutor):
31
32 # preExecute is based on athenaExecutor but with key changes:
33 # - removed athenaMP detection
34 # - removed environment so does not require the noimf notcmalloc flags
35 # - added swap of argument name for runargs file so that athenaHLT reads it in
36 def preExecute(self, input = set(), output = set()):
37 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
38
39 # Check we actually have events to process!
40 if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
41 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
42 msg.debug('Will test for events to process')
43 for dataType in input:
44 inputEvents = self.conf.dataDictionary[dataType].nentries
45 msg.debug('Got {} events for {}'.format(inputEvents, dataType))
46 if not isinstance(inputEvents, int):
47 msg.warning('Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
48 elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
49 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
50 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
51
52
53 if self._skeleton is not None:
54 inputFiles = dict()
55 for dataType in input:
56 inputFiles[dataType] = self.conf.dataDictionary[dataType]
57 outputFiles = dict()
58 for dataType in output:
59 outputFiles[dataType] = self.conf.dataDictionary[dataType]
60
61 # See if we have any 'extra' file arguments
62 for dataType, dataArg in self.conf.dataDictionary.items():
63 if dataArg.io == 'input' and self._name in dataArg.executor:
64 inputFiles[dataArg.subtype] = dataArg
65
66 msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
67
68 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
69 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
70 output = outputFiles)
71
72
74 if len(input) > 0:
75 self._extraMetadata['inputs'] = list(input)
76 if len(output) > 0:
77 self._extraMetadata['outputs'] = list(output)
78
79
80 asetupString = None
81 if 'asetup' in self.conf.argdict:
82 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
83 else:
84 msg.info('Asetup report: {0}'.format(asetupReport()))
85
86 # If legacy release, bring up centos7 container
87 OSSetupString = None
88 if asetupString is not None:
89 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
90 currentOS = os.environ['ALRB_USER_PLATFORM']
91 if legacyOSRelease and "centos7" not in currentOS:
92 OSSetupString = "centos7"
93 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
94
95 # allow overriding the container OS using a flag
96 if 'runInContainer' in self.conf.argdict:
97 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
98 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
99
100
101 dbrelease = dbsetup = None
102 if 'DBRelease' in self.conf.argdict:
103 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
104 if dbrelease:
105 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
106 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
107 if dbdMatch:
108 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
109 if not os.access(dbrelease, os.R_OK):
110 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
111 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
112 dbrelease = dbdMatch.group(1)
113 dbsetup = cvmfsDBReleaseCheck(dbrelease)
114 else:
115 # Check if the DBRelease is setup
116 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
117 if unpacked:
118 # Now run the setup.py script to customise the paths to the current location...
119 setupDBRelease(dbsetup)
120 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
121 else:
122 dbsetup = cvmfsDBReleaseCheck(dbrelease)
123
124 # Look for environment updates and perpare the athena command line
126 # above is needed by _prepAthenaCommandLine, but remove the setStandardEnvironment so doesn't include imf or tcmalloc
127 # self._envUpdate.setStandardEnvironment(self.conf.argdict)
129
130 # to get athenaHLT to read in the relevant parts from the runargs file we have to translate them
131 if 'athenaHLT' in self._exe:
132 self._cmd.remove('runargs.BSRDOtoRAW.py')
133 # get list of translated arguments to be used by athenaHLT
134 optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
135 self._cmd.extend(optionList)
136 # updates for CA
137 if self._isCAEnabled():
138 msg.info("Running in CA mode")
139 # we don't use the runargs file with athenaHLT so add the JO and preExecs to the command line
140 self._cmd.append(self._skeletonCA)
141 if 'preExec' in self.conf.argdict:
142 self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
143 msg.info('Command adjusted for CA to %s', self._cmd)
144 else:
145 msg.info("Running in legacy mode")
146
147 # Run preRun step debug stream analysis if output histogram are set
148 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
149 # Do debug stream preRun step and get asetup string from debug stream input files
150 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
151 # Setup asetup from debug stream
152 # if no --asetup r2b:string was given and is not running with tzero/software/patches as TestArea
153 if asetupString is None and dbgAsetupString is not None:
154 asetupString = dbgAsetupString
155 msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
156 # If legacy release, bring up centos7 container
157 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
158 currentOS = os.environ['ALRB_USER_PLATFORM']
159 if legacyOSRelease and "centos7" not in currentOS:
160 OSSetupString = "centos7"
161 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
162 # allow overriding the container OS using a flag
163 if 'runInContainer' in self.conf.argdict:
164 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
165 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
166
167 # Set database in command line if it was missing
168 if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
169 msg.warn("Database alias will be set to %s", dbAlias)
170 self._cmd.append("--db-server " + dbAlias)
171 else:
172 msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
173
174 # The following is needed to avoid conflicts in finding BS files prduced by running the HLT step
175 # and those already existing in the working directory
176 if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
177 # expected string based on knowing that the format will be of form: ####._HLTMPPy_####.data
178 expectedOutputFileName = '*_HLTMPPy_*.data'
179 # list of filenames of files matching expectedOutputFileName
180 matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
181 # check there are no file matches
182 if len(matchedOutputFileNames) > 0:
183 msg.error(f'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
184 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
185 f'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
186
187 # Sanity check:
188 if OSSetupString is not None and asetupString is None:
189 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
190 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
191
192 # Call athenaExecutor parent as the above overrides what athenaExecutor would have done
193 super(athenaExecutor, self).preExecute(input, output)
194
195 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
196 # This will have asetup and/or DB release setups in it
197 # Do this last in this preExecute as the _cmd needs to be finalised
198 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
199 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
200 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
201
203
204 # When running from the DB athenaHLT needs no skeleton file
205 msg.info("Before build command line check if reading from DB")
206
207 # Check if expecting to run from DB
208 removeSkeleton = False
209 if 'useDB' in self.conf.argdict:
210 removeSkeleton = True
211 elif 'athenaopts' in self.conf.argdict:
212 v = self.conf.argdict['athenaopts'].value
213 if '--use-database' in v or '-b' in v:
214 removeSkeleton = True
215
216 if removeSkeleton and self._isCAEnabled():
217 msg.error('Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
218 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_ERROR'),
219 'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
220 return 1
221
222 # Due to athenaExecutor code don't remove skeleton, otherwise lose runargs too
223 # instead remove skeleton from _topOptionsFiles
224 if removeSkeleton and self._skeleton is not None:
225 msg.info('Read from DB: remove skeleton {0} from command'.format(self._skeleton))
226 self._topOptionsFiles.remove(self._skeleton[0])
227 else:
228 msg.info('Not reading from DB: keep skeleton in command')
229
230 # Now build command line as in athenaExecutor
231 super(trigRecoExecutor, self)._prepAthenaCommandLine()
232
233 # Loop over current directory and find the output file matching input pattern
234 def _findOutputFiles(self, pattern):
235 # list to store the filenames of files matching pattern
236 matchedOutputFileNames = []
237 # list of input files that could be in the same folder and need ignoring
238 ignoreInputFileNames = []
239 for dataType, dataArg in self.conf.dataDictionary.items():
240 if dataArg.io == 'input':
241 ignoreInputFileNames.append(dataArg.value[0])
242 # loop over all files in folder to find matching output files
243 for file in os.listdir('.'):
244 if fnmatch.fnmatch(file, pattern):
245 if file in ignoreInputFileNames:
246 msg.info('Ignoring input file: %s', file)
247 else:
248 matchedOutputFileNames.append(file)
249 return matchedOutputFileNames
250
251 # merge multiple BS files into a single BS file
252 def _mergeBSfiles(self, inputFiles, outputFile):
253 msg.info(f'Merging multiple BS files ({inputFiles}) into {outputFile}')
254 # Write the list of input files to a text file, to use it as a input for file_merging
255 mergeBSFileList = 'RAWFileMerge.list'
256 try:
257 with open(mergeBSFileList, 'w') as BSFileList:
258 for fname in inputFiles:
259 BSFileList.write(f'{fname}\n')
260 except OSError as e:
261 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
262 f'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
263
264 # The user should never need to use this directly, but check it just in case...
265 if not outputFile.endswith('._0001.data'):
266 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
267 'Output merged BS filename must end in "._0001.data"')
268 else:
269 # We need to remove the suffix to pass the output as an argument to file_merging
270 outputFile = outputFile.split('._0001.data')[0]
271
272 mergeBSFailure = 0
273 try:
274 cmd = f'file_merging {mergeBSFileList} 0 {outputFile}'
275 msg.info('running command for merging (in original asetup env): %s', cmd)
276 mergeBSFailure = subprocess.call(cmd, shell=True)
277 msg.debug('file_merging return code %s', mergeBSFailure)
278 except OSError as e:
279 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
280 f'Exception raised when merging BS files using file_merging: {e}')
281 if mergeBSFailure != 0:
282 msg.error('file_merging returned error (%s) no merged BS file created', mergeBSFailure)
283 return 1
284 return 0
285
286
287 # run trigbs_extractStream.py to split a stream out of the BS file
288 # renames the split file afterwards
289 def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
290 # merge list of streams
291 outputStreams = ','.join(str(stream) for stream in streamsList)
292 msg.info('Splitting stream %s from BS file', outputStreams)
293 splitStreamFailure = 0
294 try:
295 cmd = 'trigbs_extractStream.py -s ' + outputStreams + ' ' + allStreamsFileName
296 msg.info('running command for splitting (in original asetup env): %s', cmd)
297 splitStreamFailure = subprocess.call(cmd, shell=True)
298 msg.debug('trigbs_extractStream.py splitting return code %s', splitStreamFailure)
299 except OSError as e:
300 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
301 'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.format(allStreamsFileName, e))
302 if splitStreamFailure != 0:
303 msg.warning('trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
304 return 1
305 else:
306 # know that the format will be of the form ####._athenaHLT.####.data
307 expectedStreamFileName = '*_athenaHLT*.data'
308 # list of filenames of files matching expectedStreamFileName
309 matchedOutputFileName = self._findOutputFiles(expectedStreamFileName)
310 if(len(matchedOutputFileName)):
311 self._renamefile(matchedOutputFileName[0], splitFileName)
312 return 0
313 else:
314 msg.error('trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
315 return 1
316
317 # rename a created file - used to overwrite filenames from athenaHLT into the requested argument name
318 def _renamefile(self, currentFileName, newFileName):
319 msg.info('Renaming file from %s to %s', currentFileName, newFileName)
320 try:
321 os.rename(currentFileName, newFileName)
322 except OSError as e:
323 msg.error('Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
324 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
325 'Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
326
327 def postExecute(self):
328
329 # Adding check for HLTMPPU.*Child Issue in the log file
330 # - Throws an error message if there so we catch that the child died
331 # - Also sets the return code of the mother process to mark the job as failed
332 # - Is based on trfValidation.scanLogFile
333 log = self._logFileName
334 msg.debug('Now scanning logfile {0} for HLTMPPU Child Issues'.format(log))
335 # Using the generator so that lines can be grabbed by subroutines if needed for more reporting
336
337 #Count the number of rejected events
338 rejected = 0
339 #Count the number of accepted events
340 accepted = 0
341
342 try:
343 myGen = lineByLine(log, substepName=self._substep)
344 except IOError as e:
345 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
346 for line, lineCounter in myGen:
347 # Check to see if any of the hlt children had an issue
348 if 'Child Issue' in line:
349 try:
350 signal = int((re.search('signal ([0-9]*)', line)).group(1))
351 except AttributeError:
352 # signal not found in message, so return 1 to highlight failure
353 signal = 1
354 msg.error('Detected issue with HLTChild, setting mother return code to %s', signal)
355 self._rc = signal
356
357 # Merge child log files into parent log file
358 # is needed to make sure all child log files are scanned
359 # files are found by searching whole folder rather than relying on nprocs being defined
360 try:
361 # open original log file (log.BSRDOtoRAW) to merge child files into
362 with open(self._logFileName, 'a') as merged_file:
363 for file in os.listdir('.'):
364 # expected child log files should be of the format athenaHLT:XX.out and .err
365 if fnmatch.fnmatch(file, 'athenaHLT:*'):
366 msg.info('Merging child log file (%s) into %s', file, self._logFileName)
367 with open(file) as log_file:
368 # write header infomation ### Output from athenaHLT:XX.out/err ###
369 merged_file.write('### Output from {} ###\n'.format(file))
370 # write out file line by line
371 for line in log_file:
372 merged_file.write(line)
373 # Check for rejected events in log file
374 if 'rejected:' in line and int(line[14]) != 0:
375 #Add the number of rejected events
376 rejected += int(line[14:])
377 # Check for accepted events in log file
378 if 'accepted:' in line and int(line[14]) != 0:
379 #Add the number of accepted events
380 accepted += int(line[14:])
381 if re.search('DFDcmEmuSession.* Communication error', line) or re.search('DFDcmEmuSession.* No new event provided within the timeout limit', line):
382 msg.error('Caught DFDcmEmuSession error, aborting job')
383 self._rc = 1
384
385 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
386 # Add the HLT_accepted_events and HLT_rejected_events histograms to the output file
387 dbgStream.getHltDecision(accepted, rejected, self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value[0])
388
389 except OSError as e:
390 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
391 'Exception raised when merging log files into {0}: {1}'.format(self._logFileName, e))
392
393 msg.info("Check for expert-monitoring.root file")
394 # the BS-BS step generates the files:
395 # - expert-monitoring.root (from mother process)
396 # - athenaHLT_workers/*/expert-monitoring.root (from child processes)
397 # to save on panda it needs to be renamed via the outputHIST_HLTMONFile argument
398 expectedFileName = 'expert-monitoring.root'
399
400 # first check athenaHLT step actually completed
401 if self._rc != 0:
402 msg.info('HLT step failed (with status %s) so skip HIST_HLTMON filename check', self._rc)
403 # next check argument is in dictionary as a requested output
404 elif 'outputHIST_HLTMONFile' in self.conf.argdict:
405
406 # rename the mother file
407 expectedMotherFileName = 'expert-monitoring-mother.root'
408 if(os.path.isfile(expectedFileName)):
409 msg.info('Renaming %s to %s', expectedFileName, expectedMotherFileName)
410 try:
411 os.rename(expectedFileName, expectedMotherFileName)
412 except OSError as e:
413 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
414 'Exception raised when renaming {0} to {1}: {2}'.format(expectedFileName, expectedMotherFileName, e))
415 else:
416 msg.error('HLTMON argument defined but mother %s not created', expectedFileName)
417
418 # merge worker files
419 expectedWorkerFileName = 'athenaHLT_workers/athenaHLT-01/' + expectedFileName
420 if(os.path.isfile(expectedWorkerFileName) and os.path.isfile(expectedMotherFileName)):
421 msg.info('Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0])
422 try:
423 # have checked that at least one worker file exists
424 cmd = 'hadd ' + self.conf.argdict['outputHIST_HLTMONFile'].value[0] + ' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
425 subprocess.call(cmd, shell=True)
426 except OSError as e:
427 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
428 'Exception raised when merging worker and mother {0} files to {1}: {2}'.format(expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0], e))
429 else:
430 msg.error('HLTMON argument defined %s but worker %s not created', self.conf.argdict['outputHIST_HLTMONFile'].value[0], expectedFileName)
431
432 else:
433 msg.info('HLTMON argument not defined so skip %s check', expectedFileName)
434
435 msg.info("Search for created BS files, and rename if single file found")
436 # The following is needed to handle the BS file being written with a different name (or names)
437 # base is from either the tmp value created by the transform or the value entered by the user
438
439 argInDict = {}
440 if self._rc != 0:
441 msg.error('HLT step failed (with status %s) so skip BS filename check', self._rc)
442 elif 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
443 # expected string based on knowing that the format will be of form: ####._HLTMPPy_####.data
444 expectedOutputFileName = '*_HLTMPPy_*.data'
445 # list of filenames of files matching expectedOutputFileName
446 matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
447
448
449 # check there are file matches and rename appropriately
450 if len(matchedOutputFileNames) == 0:
451 msg.error('No BS files created with expected name: %s - please check for earlier failures', expectedOutputFileName)
452 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
453 f'No BS files created with expected name: {expectedOutputFileName} - please check for earlier failures')
454 else:
455 # if only one BS file was created
456 if len(matchedOutputFileNames) == 1:
457 msg.info('Single BS file found: will split (if requested) and rename the file')
458 BSFile = matchedOutputFileNames[0]
459
460 # if more than one file BS was created, then merge them
461 else:
462 msg.info('Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
463 mergedBSFile = matchedOutputFileNames[0].split('._0001.data')[0] + '.mrg._0001.data' # a specific format is required to run file_merging
464
465 mergeFailed = self._mergeBSfiles(matchedOutputFileNames, mergedBSFile)
466 if(mergeFailed):
467 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
468 'Did not produce a merged BS file with file_merging')
469
470 BSFile = 'tmp.BS.mrg'
471 msg.info(f'Renaming temporary merged BS file to {BSFile}')
472 self._renamefile(mergedBSFile, BSFile)
473
474 # First check if we want to produce the COST DRAW output
475 if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
476 splitFailed = self._splitBSfile(['CostMonitoring'], BSFile, self.conf.dataDictionary['DRAW_TRIGCOST'].value[0])
477 if(splitFailed):
478 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
479 'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
480
481 # Run debug step for all streams
482 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
483 self._postExecuteDebug(BSFile)
484
485 # Rename BS file if requested
486 if 'BS' in self.conf.dataDictionary:
487 argInDict = self.conf.dataDictionary['BS']
488 # If a stream (not All) is selected, then slim the orignal (many stream) BS output to the particular stream
489 if 'streamSelection' in self.conf.argdict and self.conf.argdict['streamSelection'].value[0] != "All":
490 splitEmpty = self._splitBSfile(self.conf.argdict['streamSelection'].value, BSFile, argInDict.value[0])
491 if(splitEmpty):
492 msg.info('Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
493 #If splitEmpty==1, the chosen streams contained no events
494 #then run the command to produce an empty BS file and rename it to RAW.pool.root
495 #this stops non-zero exit code for rejected events
496 cmd_splitFailed = 'trigbs_failedStreamSelection.py ' + BSFile
497 msg.info('running command for creating empty file: %s', cmd_splitFailed)
498 subprocess.call(cmd_splitFailed, shell=True)
499 #Rename the empty file to "RAW.pool.root" to prevent failure
500 #expected filename will be of form: T0debug.runnumber.unknown_debug.unknown.RAW._lb0000._TRF._0001.data
501 runnumber = eformat.EventStorage.pickDataReader(BSFile).runNumber()
502 expectedOutputFileName = 'T0debug.00'+str(runnumber)+'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
503 #rename the file to RAW.pool.root, this file will contain 0 events
504 self._renamefile(expectedOutputFileName, argInDict.value[0])
505 else:
506 msg.info('Stream "All" requested, so not splitting BS file')
507 self._renamefile(BSFile, argInDict.value[0])
508 else:
509 msg.info('BS output filetype not defined so skip renaming BS')
510 else:
511 msg.info('BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
512
513 msg.info('Now run athenaExecutor:postExecute')
514 super(trigRecoExecutor, self).postExecute()
515
516 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
517 # Do debug stream postRun step for BS file that contains events after the streamSelection
518 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
519 dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=True)
520
521
522 def _postExecuteDebug(self, outputBSFile):
523 # Run postRun step debug stream analysis if output BS file and output histogram are set
524 msg.info("debug stream analysis in postExecute")
525
526 # Set file name for debug stream analysis output
527 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
528 msg.info('outputHIST_DEBUGSTREAMMONFile argument is {0}'.format(fileNameDbg))
529
530 if(os.path.isfile(fileNameDbg[0])):
531 # Keep filename if not defined
532 msg.info('Will use file created in PreRun step {0}'.format(fileNameDbg))
533 else:
534 msg.info('No file created in PreRun step {0}'.format(fileNameDbg))
535
536 # Do debug stream postRun step
537 dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)
538
if(febId1==febId2)
Class holding the update to an environment that will be passed on to an executor.
Definition trfEnv.py:15
Base class for execution exceptions.
_splitBSfile(self, streamsList, allStreamsFileName, splitFileName)
_renamefile(self, currentFileName, newFileName)
_postExecuteDebug(self, outputBSFile)
preExecute(self, input=set(), output=set())
_mergeBSfiles(self, inputFiles, outputFile)
STL class.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Transform execution functions.
Module for transform exit codes.
Transform utility functions.