ATLAS Offline Software
Loading...
Searching...
No Matches
trigRecoExe.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
4
5# @brief: Trigger executor to call base transforms
6# @details: Based on athenaExecutor with some modifications
7# @author: Mark Stockton
8
9import os
10import fnmatch
11import re
12import subprocess
13
14from PyJobTransforms.trfExe import athenaExecutor
15
16# imports for preExecute
17from PyJobTransforms.trfUtils import asetupReport, cvmfsDBReleaseCheck, unpackDBRelease, setupDBRelease, lineByLine, asetupReleaseIsOlderThan
18import PyJobTransforms.trfEnv as trfEnv
19import PyJobTransforms.trfExceptions as trfExceptions
20from PyJobTransforms.trfExitCodes import trfExit as trfExit
21import TrigTransform.dbgAnalysis as dbgStream
22from TrigTransform.trigTranslate import getTranslated as getTranslated
23
24# Setup logging here
25import logging, eformat
26msg = logging.getLogger("PyJobTransforms." + __name__)
27
28# Trig_reco_tf.py executor for BS-BS step (aka running the trigger)
29# used to setup input files/arguments and change output filenames
30class trigRecoExecutor(athenaExecutor):
31
32 # preExecute is based on athenaExecutor but with key changes:
33 # - removed athenaMP detection
34 # - removed environment so does not require the noimf notcmalloc flags
35 # - added swap of argument name for runargs file so that athenaHLT reads it in
36 def preExecute(self, input = set(), output = set()):
37 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
38
39 # Check we actually have events to process!
40 if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
41 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
42 msg.debug('Will test for events to process')
43 for dataType in input:
44 inputEvents = self.conf.dataDictionary[dataType].nentries
45 msg.debug('Got {} events for {}'.format(inputEvents, dataType))
46 if not isinstance(inputEvents, int):
47 msg.warning('Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
48 elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
49 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
50 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
51
52
53 if self._skeleton is not None:
54 inputFiles = dict()
55 for dataType in input:
56 inputFiles[dataType] = self.conf.dataDictionary[dataType]
57 outputFiles = dict()
58 for dataType in output:
59 outputFiles[dataType] = self.conf.dataDictionary[dataType]
60
61 # See if we have any 'extra' file arguments
62 for dataType, dataArg in self.conf.dataDictionary.items():
63 if dataArg.io == 'input' and self._name in dataArg.executor:
64 inputFiles[dataArg.subtype] = dataArg
65
66 msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
67
68 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
69 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
70 output = outputFiles)
71
72
74 if len(input) > 0:
75 self._extraMetadata['inputs'] = list(input)
76 if len(output) > 0:
77 self._extraMetadata['outputs'] = list(output)
78
79
80 asetupString = None
81 if 'asetup' in self.conf.argdict:
82 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
83 else:
84 msg.info('Asetup report: {0}'.format(asetupReport()))
85
86 # If legacy release, bring up centos7 container
87 OSSetupString = None
88 if asetupString is not None:
89 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
90 currentOS = os.environ['ALRB_USER_PLATFORM']
91 if legacyOSRelease and "centos7" not in currentOS:
92 OSSetupString = "centos7"
93 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
94
95 # allow overriding the container OS using a flag
96 if 'runInContainer' in self.conf.argdict:
97 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
98 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
99
100
101 dbrelease = dbsetup = None
102 if 'DBRelease' in self.conf.argdict:
103 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
104 if dbrelease:
105 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
106 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
107 if dbdMatch:
108 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
109 if not os.access(dbrelease, os.R_OK):
110 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
111 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
112 dbrelease = dbdMatch.group(1)
113 dbsetup = cvmfsDBReleaseCheck(dbrelease)
114 else:
115 # Check if the DBRelease is setup
116 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
117 if unpacked:
118 # Now run the setup.py script to customise the paths to the current location...
119 setupDBRelease(dbsetup)
120 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
121 else:
122 dbsetup = cvmfsDBReleaseCheck(dbrelease)
123
124 # Look for environment updates and perpare the athena command line
126 # above is needed by _prepAthenaCommandLine, but remove the setStandardEnvironment so doesn't include imf or tcmalloc
127 # self._envUpdate.setStandardEnvironment(self.conf.argdict)
129
130 # to get athenaHLT to read in the relevant parts from the runargs file we have to translate them
131 if 'athenaHLT' in self._exe:
132 self._cmd.remove('runargs.BSRDOtoRAW.py')
133 # get list of translated arguments to be used by athenaHLT
134 optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
135 self._cmd.extend(optionList)
136 # updates for CA
137 if self._isCAEnabled():
138 msg.info("Running in CA mode")
139 # we don't use the runargs file with athenaHLT so add the JO and preExecs to the command line
140 self._cmd.append(self._skeletonCA)
141 if 'preExec' in self.conf.argdict:
142 self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
143 msg.info('Command adjusted for CA to %s', self._cmd)
144 else:
145 msg.info("Running in legacy mode")
146
147 # Run preRun step debug stream analysis if output histogram are set
148 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
149 # Do debug stream preRun step and get asetup string from debug stream input files
150 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
151 # Setup asetup from debug stream
152 # if no --asetup r2b:string was given and is not running with tzero/software/patches as TestArea
153 if asetupString is None and dbgAsetupString is not None:
154 asetupString = dbgAsetupString
155 msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
156 # If legacy release, bring up centos7 container
157 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
158 currentOS = os.environ['ALRB_USER_PLATFORM']
159 if legacyOSRelease and "centos7" not in currentOS:
160 OSSetupString = "centos7"
161 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
162 # allow overriding the container OS using a flag
163 if 'runInContainer' in self.conf.argdict:
164 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
165 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
166
167 # Set database in command line if it was missing
168 if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
169 msg.warn("Database alias will be set to %s", dbAlias)
170 self._cmd.append("--db-server " + dbAlias)
171 else:
172 msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
173
174 # The following is needed to avoid conflicts in finding BS files prduced by running the HLT step
175 # and those already existing in the working directory
176 if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
177 expectedOutputFileName = '*_HLTMPPy_RAW.pool.root.*.data'
178 # list of filenames of files matching expectedOutputFileName
179 matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
180 # check there are no file matches
181 if len(matchedOutputFileNames) > 0:
182 msg.error(f'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
183 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
184 f'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
185
186 # Sanity check:
187 if OSSetupString is not None and asetupString is None:
188 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
189 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
190
191 # Call athenaExecutor parent as the above overrides what athenaExecutor would have done
192 super(athenaExecutor, self).preExecute(input, output)
193
194 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
195 # This will have asetup and/or DB release setups in it
196 # Do this last in this preExecute as the _cmd needs to be finalised
197 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
198 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
199 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
200
202
203 # When running from the DB athenaHLT needs no skeleton file
204 msg.info("Before build command line check if reading from DB")
205
206 # Check if expecting to run from DB
207 removeSkeleton = False
208 if 'useDB' in self.conf.argdict:
209 removeSkeleton = True
210 elif 'athenaopts' in self.conf.argdict:
211 v = self.conf.argdict['athenaopts'].value
212 if '--use-database' in v or '-b' in v:
213 removeSkeleton = True
214
215 if removeSkeleton and self._isCAEnabled():
216 msg.error('Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
217 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_ERROR'),
218 'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
219 return 1
220
221 # Due to athenaExecutor code don't remove skeleton, otherwise lose runargs too
222 # instead remove skeleton from _topOptionsFiles
223 if removeSkeleton and self._skeleton is not None:
224 msg.info('Read from DB: remove skeleton {0} from command'.format(self._skeleton))
225 self._topOptionsFiles.remove(self._skeleton[0])
226 else:
227 msg.info('Not reading from DB: keep skeleton in command')
228
229 # Now build command line as in athenaExecutor
230 super(trigRecoExecutor, self)._prepAthenaCommandLine()
231
232 # Loop over current directory and find the output file matching input pattern
233 def _findOutputFiles(self, pattern):
234 # list to store the filenames of files matching pattern
235 matchedOutputFileNames = []
236 # list of input files that could be in the same folder and need ignoring
237 ignoreInputFileNames = []
238 for dataType, dataArg in self.conf.dataDictionary.items():
239 if dataArg.io == 'input':
240 ignoreInputFileNames.append(dataArg.value[0])
241 # loop over all files in folder to find matching output files
242 for file in os.listdir('.'):
243 if fnmatch.fnmatch(file, pattern):
244 if file in ignoreInputFileNames:
245 msg.info('Ignoring input file: %s', file)
246 else:
247 matchedOutputFileNames.append(file)
248 return matchedOutputFileNames
249
250 # merge multiple BS files into a single BS file
251 def _mergeBSfiles(self, inputFiles, outputFile):
252 msg.info(f'Merging multiple BS files ({inputFiles}) into {outputFile}')
253 # Write the list of input files to a text file, to use it as a input for file_merging
254 mergeBSFileList = 'RAWFileMerge.list'
255 try:
256 with open(mergeBSFileList, 'w') as BSFileList:
257 for fname in inputFiles:
258 BSFileList.write(f'{fname}\n')
259 except OSError as e:
260 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
261 f'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
262
263 # The user should never need to use this directly, but check it just in case...
264 if not outputFile.endswith('._0001.data'):
265 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
266 'Output merged BS filename must end in "._0001.data"')
267 else:
268 # We need to remove the suffix to pass the output as an argument to file_merging
269 outputFile = outputFile.split('._0001.data')[0]
270
271 mergeBSFailure = 0
272 try:
273 cmd = f'file_merging {mergeBSFileList} 0 {outputFile}'
274 msg.info('running command for merging (in original asetup env): %s', cmd)
275 mergeBSFailure = subprocess.call(cmd, shell=True)
276 msg.debug('file_merging return code %s', mergeBSFailure)
277 except OSError as e:
278 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
279 f'Exception raised when merging BS files using file_merging: {e}')
280 if mergeBSFailure != 0:
281 msg.error('file_merging returned error (%s) no merged BS file created', mergeBSFailure)
282 return 1
283 return 0
284
285
286 # run trigbs_extractStream.py to split a stream out of the BS file
287 # renames the split file afterwards
288 def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
289 # merge list of streams
290 outputStreams = ','.join(str(stream) for stream in streamsList)
291 msg.info('Splitting stream %s from BS file', outputStreams)
292 splitStreamFailure = 0
293 try:
294 cmd = 'trigbs_extractStream.py -s ' + outputStreams + ' ' + allStreamsFileName
295 msg.info('running command for splitting (in original asetup env): %s', cmd)
296 splitStreamFailure = subprocess.call(cmd, shell=True)
297 msg.debug('trigbs_extractStream.py splitting return code %s', splitStreamFailure)
298 except OSError as e:
299 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
300 'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.format(allStreamsFileName, e))
301 if splitStreamFailure != 0:
302 msg.warning('trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
303 return 1
304 else:
305 expectedStreamFileName = '*._HLTMPPy_RAW._*.data'
306 # list of filenames of files matching expectedStreamFileName
307 matchedOutputFileName = self._findOutputFiles(expectedStreamFileName)
308 if(len(matchedOutputFileName)):
309 self._renamefile(matchedOutputFileName[0], splitFileName)
310 return 0
311 else:
312 msg.error('trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
313 return 1
314
315 # rename a created file - used to overwrite filenames from athenaHLT into the requested argument name
316 def _renamefile(self, currentFileName, newFileName):
317 msg.info('Renaming file from %s to %s', currentFileName, newFileName)
318 try:
319 os.rename(currentFileName, newFileName)
320 except OSError as e:
321 msg.error('Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
322 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
323 'Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
324
325 def postExecute(self):
326
327 # Adding check for HLTMPPU.*Child Issue in the log file
328 # - Throws an error message if there so we catch that the child died
329 # - Also sets the return code of the mother process to mark the job as failed
330 # - Is based on trfValidation.scanLogFile
331 log = self._logFileName
332 msg.debug('Now scanning logfile {0} for HLTMPPU Child Issues'.format(log))
333 # Using the generator so that lines can be grabbed by subroutines if needed for more reporting
334
335 #Count the number of rejected events
336 rejected = 0
337 #Count the number of accepted events
338 accepted = 0
339
340 try:
341 myGen = lineByLine(log, substepName=self._substep)
342 except IOError as e:
343 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
344 for line, lineCounter in myGen:
345 # Check to see if any of the hlt children had an issue
346 if 'Child Issue' in line:
347 try:
348 signal = int((re.search('signal ([0-9]*)', line)).group(1))
349 except AttributeError:
350 # signal not found in message, so return 1 to highlight failure
351 signal = 1
352 msg.error('Detected issue with HLTChild, setting mother return code to %s', signal)
353 self._rc = signal
354
355 # Merge child log files into parent log file
356 # is needed to make sure all child log files are scanned
357 # files are found by searching whole folder rather than relying on nprocs being defined
358 try:
359 # open original log file (log.BSRDOtoRAW) to merge child files into
360 with open(self._logFileName, 'a') as merged_file:
361 for file in os.listdir('.'):
362 # expected child log files should be of the format athenaHLT:XX.out and .err
363 if fnmatch.fnmatch(file, 'athenaHLT:*'):
364 msg.info('Merging child log file (%s) into %s', file, self._logFileName)
365 with open(file) as log_file:
366 # write header infomation ### Output from athenaHLT:XX.out/err ###
367 merged_file.write('### Output from {} ###\n'.format(file))
368 # write out file line by line
369 for line in log_file:
370 merged_file.write(line)
371 # Check for rejected events in log file
372 if 'rejected:' in line and int(line[14]) != 0:
373 #Add the number of rejected events
374 rejected += int(line[14:])
375 # Check for accepted events in log file
376 if 'accepted:' in line and int(line[14]) != 0:
377 #Add the number of accepted events
378 accepted += int(line[14:])
379 if re.search('DFDcmEmuSession.* Communication error', line) or re.search('DFDcmEmuSession.* No new event provided within the timeout limit', line):
380 msg.error('Caught DFDcmEmuSession error, aborting job')
381 self._rc = 1
382
383 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
384 # Add the HLT_accepted_events and HLT_rejected_events histograms to the output file
385 dbgStream.getHltDecision(accepted, rejected, self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value[0])
386
387 except OSError as e:
388 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
389 'Exception raised when merging log files into {0}: {1}'.format(self._logFileName, e))
390
391 msg.info("Check for expert-monitoring.root file")
392 # the BS-BS step generates the files:
393 # - expert-monitoring.root (from mother process)
394 # - athenaHLT_workers/*/expert-monitoring.root (from child processes)
395 # to save on panda it needs to be renamed via the outputHIST_HLTMONFile argument
396 expectedFileName = 'expert-monitoring.root'
397
398 # first check athenaHLT step actually completed
399 if self._rc != 0:
400 msg.info('HLT step failed (with status %s) so skip HIST_HLTMON filename check', self._rc)
401 # next check argument is in dictionary as a requested output
402 elif 'outputHIST_HLTMONFile' in self.conf.argdict:
403
404 # rename the mother file
405 expectedMotherFileName = 'expert-monitoring-mother.root'
406 if(os.path.isfile(expectedFileName)):
407 msg.info('Renaming %s to %s', expectedFileName, expectedMotherFileName)
408 try:
409 os.rename(expectedFileName, expectedMotherFileName)
410 except OSError as e:
411 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
412 'Exception raised when renaming {0} to {1}: {2}'.format(expectedFileName, expectedMotherFileName, e))
413 else:
414 msg.error('HLTMON argument defined but mother %s not created', expectedFileName)
415
416 # merge worker files
417 expectedWorkerFileName = 'athenaHLT_workers/athenaHLT-01/' + expectedFileName
418 if(os.path.isfile(expectedWorkerFileName) and os.path.isfile(expectedMotherFileName)):
419 msg.info('Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0])
420 try:
421 # have checked that at least one worker file exists
422 cmd = 'hadd ' + self.conf.argdict['outputHIST_HLTMONFile'].value[0] + ' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
423 subprocess.call(cmd, shell=True)
424 except OSError as e:
425 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
426 'Exception raised when merging worker and mother {0} files to {1}: {2}'.format(expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0], e))
427 else:
428 msg.error('HLTMON argument defined %s but worker %s not created', self.conf.argdict['outputHIST_HLTMONFile'].value[0], expectedFileName)
429
430 else:
431 msg.info('HLTMON argument not defined so skip %s check', expectedFileName)
432
433 msg.info("Search for created BS files, and rename if single file found")
434 # The following is needed to handle the BS file being written with a different name (or names)
435 # base is from either the tmp value created by the transform or the value entered by the user
436
437 argInDict = {}
438 if self._rc != 0:
439 msg.error('HLT step failed (with status %s) so skip BS filename check', self._rc)
440 elif 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
441 expectedOutputFileName = '*_HLTMPPy_RAW.pool.root.*.data'
442 # list of filenames of files matching expectedOutputFileName
443 matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
444
445
446 # check there are file matches and rename appropriately
447 if len(matchedOutputFileNames) == 0:
448 msg.error('No BS files created with expected name: %s - please check for earlier failures', expectedOutputFileName)
449 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
450 f'No BS files created with expected name: {expectedOutputFileName} - please check for earlier failures')
451 else:
452 # if only one BS file was created
453 if len(matchedOutputFileNames) == 1:
454 msg.info('Single BS file found: will split (if requested) and rename the file')
455 BSFile = matchedOutputFileNames[0]
456
457 # if more than one file BS was created, then merge them
458 else:
459 msg.info('Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
460 mergedBSFile = matchedOutputFileNames[0].split('._0001.data')[0] + '.mrg._0001.data' # a specific format is required to run file_merging
461
462 mergeFailed = self._mergeBSfiles(matchedOutputFileNames, mergedBSFile)
463 if(mergeFailed):
464 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
465 'Did not produce a merged BS file with file_merging')
466
467 BSFile = 'tmp.BS.mrg'
468 msg.info(f'Renaming temporary merged BS file to {BSFile}')
469 self._renamefile(mergedBSFile, BSFile)
470
471 # First check if we want to produce the COST DRAW output
472 if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
473 splitFailed = self._splitBSfile(['CostMonitoring'], BSFile, self.conf.dataDictionary['DRAW_TRIGCOST'].value[0])
474 if(splitFailed):
475 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
476 'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
477
478 # Run debug step for all streams
479 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
480 self._postExecuteDebug(BSFile)
481
482 # Rename BS file if requested
483 if 'BS' in self.conf.dataDictionary:
484 argInDict = self.conf.dataDictionary['BS']
485 # If a stream (not All) is selected, then slim the orignal (many stream) BS output to the particular stream
486 if 'streamSelection' in self.conf.argdict and self.conf.argdict['streamSelection'].value[0] != "All":
487 splitEmpty = self._splitBSfile(self.conf.argdict['streamSelection'].value, BSFile, argInDict.value[0])
488 if(splitEmpty):
489 msg.info('Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
490 #If splitEmpty==1, the chosen streams contained no events
491 #then run the command to produce an empty BS file and rename it to RAW.pool.root
492 #this stops non-zero exit code for rejected events
493 cmd_splitFailed = 'trigbs_failedStreamSelection.py ' + BSFile
494 msg.info('running command for creating empty file: %s', cmd_splitFailed)
495 subprocess.call(cmd_splitFailed, shell=True)
496 #Rename the empty file to "RAW.pool.root" to prevent failure
497 #expected filename will be of form: T0debug.runnumber.unknown_debug.unknown.RAW._lb0000._TRF._0001.data
498 runnumber = eformat.EventStorage.pickDataReader(BSFile).runNumber()
499 expectedOutputFileName = 'T0debug.00'+str(runnumber)+'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
500 #rename the file to RAW.pool.root, this file will contain 0 events
501 self._renamefile(expectedOutputFileName, argInDict.value[0])
502 else:
503 msg.info('Stream "All" requested, so not splitting BS file')
504 self._renamefile(BSFile, argInDict.value[0])
505 else:
506 msg.info('BS output filetype not defined so skip renaming BS')
507 else:
508 msg.info('BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
509
510 msg.info('Now run athenaExecutor:postExecute')
511 super(trigRecoExecutor, self).postExecute()
512
513 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
514 # Do debug stream postRun step for BS file that contains events after the streamSelection
515 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
516 dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=True)
517
518
519 def _postExecuteDebug(self, outputBSFile):
520 # Run postRun step debug stream analysis if output BS file and output histogram are set
521 msg.info("debug stream analysis in postExecute")
522
523 # Set file name for debug stream analysis output
524 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
525 msg.info('outputHIST_DEBUGSTREAMMONFile argument is {0}'.format(fileNameDbg))
526
527 if(os.path.isfile(fileNameDbg[0])):
528 # Keep filename if not defined
529 msg.info('Will use file created in PreRun step {0}'.format(fileNameDbg))
530 else:
531 msg.info('No file created in PreRun step {0}'.format(fileNameDbg))
532
533 # Do debug stream postRun step
534 dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)
535
if(pathvar)
Class holding the update to an environment that will be passed on to an executor.
Definition trfEnv.py:15
Base class for execution exceptions.
_splitBSfile(self, streamsList, allStreamsFileName, splitFileName)
_renamefile(self, currentFileName, newFileName)
_postExecuteDebug(self, outputBSFile)
preExecute(self, input=set(), output=set())
_mergeBSfiles(self, inputFiles, outputFile)
STL class.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179
Transform execution functions.
Module for transform exit codes.
Transform utility functions.