ATLAS Offline Software
Loading...
Searching...
No Matches
trigRecoExe.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
4
5# @brief: Trigger executor to call base transforms
6# @details: Based on athenaExecutor with some modifications
7# @author: Mark Stockton
8
9import os
10import fnmatch
11import re
12import subprocess
13
14from PyJobTransforms.trfExe import athenaExecutor
15
16# imports for preExecute
17from PyJobTransforms.trfUtils import asetupReport, cvmfsDBReleaseCheck, unpackDBRelease, setupDBRelease, lineByLine, asetupReleaseIsOlderThan
18import PyJobTransforms.trfEnv as trfEnv
19import PyJobTransforms.trfExceptions as trfExceptions
20from PyJobTransforms.trfExitCodes import trfExit as trfExit
21import TrigTransform.dbgAnalysis as dbgStream
22from TrigTransform.trigTranslate import getTranslated as getTranslated
23
24# Setup logging here
25import logging, eformat
26msg = logging.getLogger("PyJobTransforms." + __name__)
27
28# Trig_reco_tf.py executor for BS-BS step (aka running the trigger)
29# used to setup input files/arguments and change output filenames
30class trigRecoExecutor(athenaExecutor):
31
32 # Pattern for output files produces by athenaHLT/EF
33 expectedOutputFileName = '*SingleStream.daq.RAW._*.data'
34
35 # preExecute is based on athenaExecutor but with key changes:
36 # - removed athenaMP detection
37 # - removed environment so does not require the noimf notcmalloc flags
38 # - added swap of argument name for runargs file
39 def preExecute(self, input = set(), output = set()):
40 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
41
42 # setsid needed to fix process-group id of child processes to be the same as mother process (ATR-20513)
43 self._exe = 'setsid ' + self.conf.argdict['trigExe'].value
44
45 # Check we actually have events to process!
46 if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
47 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
48 msg.debug('Will test for events to process')
49 for dataType in input:
50 inputEvents = self.conf.dataDictionary[dataType].nentries
51 msg.debug('Got {} events for {}'.format(inputEvents, dataType))
52 if not isinstance(inputEvents, int):
53 msg.warning('Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
54 elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
55 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
56 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
57
58
59 if self._skeleton is not None:
60 inputFiles = dict()
61 for dataType in input:
62 inputFiles[dataType] = self.conf.dataDictionary[dataType]
63 outputFiles = dict()
64 for dataType in output:
65 outputFiles[dataType] = self.conf.dataDictionary[dataType]
66
67 # See if we have any 'extra' file arguments
68 for dataType, dataArg in self.conf.dataDictionary.items():
69 if dataArg.io == 'input' and self._name in dataArg.executor:
70 inputFiles[dataArg.subtype] = dataArg
71
72 msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
73
74 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
75 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
76 output = outputFiles)
77
78
80 if len(input) > 0:
81 self._extraMetadata['inputs'] = list(input)
82 if len(output) > 0:
83 self._extraMetadata['outputs'] = list(output)
84
85
86 asetupString = None
87 if 'asetup' in self.conf.argdict:
88 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
89 else:
90 msg.info('Asetup report: {0}'.format(asetupReport()))
91
92 # If legacy release, bring up centos7 container
93 OSSetupString = None
94 if asetupString is not None:
95 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
96 currentOS = os.environ['ALRB_USER_PLATFORM']
97 if legacyOSRelease and "centos7" not in currentOS:
98 OSSetupString = "centos7"
99 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
100
101 # allow overriding the container OS using a flag
102 if 'runInContainer' in self.conf.argdict:
103 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
104 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
105
106
107 dbrelease = dbsetup = None
108 if 'DBRelease' in self.conf.argdict:
109 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
110 if dbrelease:
111 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
112 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
113 if dbdMatch:
114 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
115 if not os.access(dbrelease, os.R_OK):
116 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
117 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
118 dbrelease = dbdMatch.group(1)
119 dbsetup = cvmfsDBReleaseCheck(dbrelease)
120 else:
121 # Check if the DBRelease is setup
122 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
123 if unpacked:
124 # Now run the setup.py script to customise the paths to the current location...
125 setupDBRelease(dbsetup)
126 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
127 else:
128 dbsetup = cvmfsDBReleaseCheck(dbrelease)
129
130 # Look for environment updates and perpare the athena command line
132 # above is needed by _prepAthenaCommandLine, but remove the setStandardEnvironment so doesn't include imf or tcmalloc
133 # self._envUpdate.setStandardEnvironment(self.conf.argdict)
135
136 # translate relevant parts from the runargs file
137 if 'athenaHLT' in self._exe or 'athenaEF' in self._exe:
138 self._cmd.remove('runargs.BSRDOtoRAW.py')
139 # get list of translated arguments
140 optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
141 self._cmd.extend(optionList)
142 # updates for CA
143 if self._isCAEnabled():
144 msg.info("Running in CA mode")
145 # we don't use the runargs file so add the JO and preExecs to the command line
146 self._cmd.append(self._skeletonCA)
147 if 'preExec' in self.conf.argdict:
148 self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
149 msg.info('Command adjusted for CA to %s', self._cmd)
150 else:
151 msg.info("Running in legacy mode")
152
153 # Run preRun step debug stream analysis if output histogram are set
154 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
155 # Do debug stream preRun step and get asetup string from debug stream input files
156 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
157 # Setup asetup from debug stream
158 # if no --asetup r2b:string was given and is not running with tzero/software/patches as TestArea
159 if asetupString is None and dbgAsetupString is not None:
160 asetupString = dbgAsetupString
161 msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
162 # If legacy release, bring up centos7 container
163 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
164 currentOS = os.environ['ALRB_USER_PLATFORM']
165 if legacyOSRelease and "centos7" not in currentOS:
166 OSSetupString = "centos7"
167 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
168 # allow overriding the container OS using a flag
169 if 'runInContainer' in self.conf.argdict:
170 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
171 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
172
173 # Set database in command line if it was missing
174 if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
175 msg.warn("Database alias will be set to %s", dbAlias)
176 self._cmd.append("--db-server " + dbAlias)
177 else:
178 msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
179
180 # The following is needed to avoid conflicts in finding BS files prduced by running the HLT step
181 # and those already existing in the working directory
182 if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
183 # list of filenames of files matching expectedOutputFileName
184 matchedOutputFileNames = self._findOutputFiles(self.expectedOutputFileName)
185 # check there are no file matches
186 if len(matchedOutputFileNames) > 0:
187 msg.error(f'Directoy already contains files with expected output name format ({self.expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
188 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
189 f'Directory already contains files with expected output name format {self.expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
190
191 # Sanity check:
192 if OSSetupString is not None and asetupString is None:
193 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
194 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
195
196 # Call athenaExecutor parent as the above overrides what athenaExecutor would have done
197 super(athenaExecutor, self).preExecute(input, output)
198
199 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
200 # This will have asetup and/or DB release setups in it
201 # Do this last in this preExecute as the _cmd needs to be finalised
202 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
203 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
204 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
205
207
208 # When running from the DB, no skeleton file is needed
209 msg.info("Before build command line check if reading from DB")
210
211 # Check if expecting to run from DB
212 removeSkeleton = False
213 if 'useDB' in self.conf.argdict:
214 removeSkeleton = True
215 elif 'athenaopts' in self.conf.argdict:
216 v = self.conf.argdict['athenaopts'].value
217 if '--use-database' in v or '-b' in v:
218 removeSkeleton = True
219
220 if removeSkeleton and self._isCAEnabled():
221 msg.error('Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
222 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_ERROR'),
223 'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
224 return 1
225
226 # Due to athenaExecutor code don't remove skeleton, otherwise lose runargs too
227 # instead remove skeleton from _topOptionsFiles
228 if removeSkeleton and self._skeleton is not None:
229 msg.info('Read from DB: remove skeleton {0} from command'.format(self._skeleton))
230 self._topOptionsFiles.remove(self._skeleton[0])
231 else:
232 msg.info('Not reading from DB: keep skeleton in command')
233
234 # Now build command line as in athenaExecutor
235 super(trigRecoExecutor, self)._prepAthenaCommandLine()
236
237 # Loop over current directory and find the output file matching input pattern
238 def _findOutputFiles(self, pattern):
239 # list to store the filenames of files matching pattern
240 matchedOutputFileNames = []
241 # list of input files that could be in the same folder and need ignoring
242 ignoreInputFileNames = []
243 for dataType, dataArg in self.conf.dataDictionary.items():
244 if dataArg.io == 'input':
245 ignoreInputFileNames.append(dataArg.value[0])
246 # loop over all files in folder to find matching output files
247 for file in os.listdir('.'):
248 if fnmatch.fnmatch(file, pattern):
249 if file in ignoreInputFileNames:
250 msg.info('Ignoring input file: %s', file)
251 else:
252 matchedOutputFileNames.append(file)
253 return matchedOutputFileNames
254
255 # merge multiple BS files into a single BS file
256 def _mergeBSfiles(self, inputFiles, outputFile):
257 msg.info(f'Merging multiple BS files ({inputFiles}) into {outputFile}')
258 # Write the list of input files to a text file, to use it as a input for file_merging
259 mergeBSFileList = 'RAWFileMerge.list'
260 try:
261 with open(mergeBSFileList, 'w') as BSFileList:
262 for fname in inputFiles:
263 BSFileList.write(f'{fname}\n')
264 except OSError as e:
265 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
266 f'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
267
268 # The user should never need to use this directly, but check it just in case...
269 if not outputFile.endswith('._0001.data'):
270 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
271 'Output merged BS filename must end in "._0001.data"')
272 else:
273 # We need to remove the suffix to pass the output as an argument to file_merging
274 outputFile = outputFile.split('._0001.data')[0]
275
276 mergeBSFailure = 0
277 try:
278 cmd = f'file_merging {mergeBSFileList} 0 {outputFile}'
279 msg.info('running command for merging (in original asetup env): %s', cmd)
280 mergeBSFailure = subprocess.call(cmd, shell=True)
281 msg.debug('file_merging return code %s', mergeBSFailure)
282 except OSError as e:
283 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
284 f'Exception raised when merging BS files using file_merging: {e}')
285 if mergeBSFailure != 0:
286 msg.error('file_merging returned error (%s) no merged BS file created', mergeBSFailure)
287 return 1
288 return 0
289
290
291 # run trigbs_extractStream.py to split a stream out of the BS file
292 # renames the split file afterwards
293 def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
294 # merge list of streams
295 outputStreams = ','.join(str(stream) for stream in streamsList)
296 msg.info('Splitting stream %s from BS file', outputStreams)
297 splitStreamFailure = 0
298 try:
299 cmd = f'trigbs_extractStream.py -s {outputStreams} {allStreamsFileName}'
300 msg.info('running command for splitting (in original asetup env): %s', cmd)
301 splitStreamFailure = subprocess.call(cmd, shell=True)
302 msg.debug('trigbs_extractStream.py splitting return code %s', splitStreamFailure)
303 except OSError as e:
304 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
305 'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.format(allStreamsFileName, e))
306 if splitStreamFailure != 0:
307 msg.warning('trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
308 return 1
309 else:
310 # If more than one stream selected, trigbs_extractStream produces an "accepted" stream file
311 streamName = outputStreams if len(streamsList)==1 else 'accepted'
312 expectedStreamFileName = f'*_{streamName}.*.RAW._*.data'
313 # list of filenames of files matching expectedStreamFileName
314 matchedOutputFileName = self._findOutputFiles(expectedStreamFileName)
315 if(len(matchedOutputFileName)):
316 self._renamefile(matchedOutputFileName[0], splitFileName)
317 return 0
318 else:
319 msg.error('trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
320 return 1
321
322 # rename a created file to match the requested argument name
323 def _renamefile(self, currentFileName, newFileName):
324 msg.info('Renaming file from %s to %s', currentFileName, newFileName)
325 try:
326 os.rename(currentFileName, newFileName)
327 except OSError as e:
328 msg.error('Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
329 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
330 'Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
331
332 def postExecute(self):
333
334 # Adding check for HLTMPPU.*Child Issue in the log file
335 # - Throws an error message if there so we catch that the child died
336 # - Also sets the return code of the mother process to mark the job as failed
337 # - Is based on trfValidation.scanLogFile
338 log = self._logFileName
339 msg.debug('Now scanning logfile {0} for HLTMPPU Child Issues'.format(log))
340 # Using the generator so that lines can be grabbed by subroutines if needed for more reporting
341
342 #Count the number of rejected events
343 rejected = 0
344 #Count the number of accepted events
345 accepted = 0
346
347 try:
348 myGen = lineByLine(log, substepName=self._substep)
349 except IOError as e:
350 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
351 for line, lineCounter in myGen:
352 # Check to see if any of the hlt children had an issue
353 if 'Child Issue' in line:
354 try:
355 signal = int((re.search('signal ([0-9]*)', line)).group(1))
356 except AttributeError:
357 # signal not found in message, so return 1 to highlight failure
358 signal = 1
359 msg.error('Detected issue with HLTChild, setting mother return code to %s', signal)
360 self._rc = signal
361
362 # Merge child log files into parent log file
363 # is needed to make sure all child log files are scanned
364 # files are found by searching whole folder rather than relying on nprocs being defined
365 try:
366 # open original log file (log.BSRDOtoRAW) to merge child files into
367 with open(self._logFileName, 'a') as merged_file:
368 for file in os.listdir('.'):
369 # expected child log files should be of the format athenaHLT:XX.out and .err
370 if fnmatch.fnmatch(file, 'athenaHLT:*'):
371 msg.info('Merging child log file (%s) into %s', file, self._logFileName)
372 with open(file) as log_file:
373 # write header infomation ### Output from athenaHLT:XX.out/err ###
374 merged_file.write('### Output from {} ###\n'.format(file))
375 # write out file line by line
376 for line in log_file:
377 merged_file.write(line)
378 # Check for rejected events in log file
379 if 'rejected:' in line and int(line[14]) != 0:
380 #Add the number of rejected events
381 rejected += int(line[14:])
382 # Check for accepted events in log file
383 if 'accepted:' in line and int(line[14]) != 0:
384 #Add the number of accepted events
385 accepted += int(line[14:])
386 if re.search('DFDcmEmuSession.* Communication error', line) or re.search('DFDcmEmuSession.* No new event provided within the timeout limit', line):
387 msg.error('Caught DFDcmEmuSession error, aborting job')
388 self._rc = 1
389
390 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
391 # Add the HLT_accepted_events and HLT_rejected_events histograms to the output file
392 dbgStream.getHltDecision(accepted, rejected, self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value[0])
393
394 except OSError as e:
395 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
396 'Exception raised when merging log files into {0}: {1}'.format(self._logFileName, e))
397
398 msg.info("Check for expert-monitoring.root file")
399 # the BS-BS step generates the files:
400 # - expert-monitoring.root (from mother process)
401 # - athenaHLT_workers/*/expert-monitoring.root (from child processes)
402 # to save on panda it needs to be renamed via the outputHIST_HLTMONFile argument
403 expectedFileName = 'expert-monitoring.root'
404
405 # first check if trigger step actually completed
406 if self._rc != 0:
407 msg.info('HLT step failed (with status %s) so skip HIST_HLTMON filename check', self._rc)
408 # next check argument is in dictionary as a requested output
409 elif 'outputHIST_HLTMONFile' in self.conf.argdict:
410
411 # rename the mother file
412 expectedMotherFileName = 'expert-monitoring-mother.root'
413 if(os.path.isfile(expectedFileName)):
414 msg.info('Renaming %s to %s', expectedFileName, expectedMotherFileName)
415 try:
416 os.rename(expectedFileName, expectedMotherFileName)
417 except OSError as e:
418 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
419 'Exception raised when renaming {0} to {1}: {2}'.format(expectedFileName, expectedMotherFileName, e))
420 else:
421 msg.error('HLTMON argument defined but mother %s not created', expectedFileName)
422
423 # merge worker files
424 expectedWorkerFileName = 'athenaHLT_workers/athenaHLT-01/' + expectedFileName
425 if(os.path.isfile(expectedWorkerFileName) and os.path.isfile(expectedMotherFileName)):
426 msg.info('Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0])
427 try:
428 # have checked that at least one worker file exists
429 cmd = 'hadd ' + self.conf.argdict['outputHIST_HLTMONFile'].value[0] + ' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
430 subprocess.call(cmd, shell=True)
431 except OSError as e:
432 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
433 'Exception raised when merging worker and mother {0} files to {1}: {2}'.format(expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0], e))
434 else:
435 msg.error('HLTMON argument defined %s but worker %s not created', self.conf.argdict['outputHIST_HLTMONFile'].value[0], expectedFileName)
436
437 else:
438 msg.info('HLTMON argument not defined so skip %s check', expectedFileName)
439
440 msg.info("Search for created BS files, and rename if single file found")
441 # The following is needed to handle the BS file being written with a different name (or names)
442 # base is from either the tmp value created by the transform or the value entered by the user
443
444 argInDict = {}
445 if self._rc != 0:
446 msg.error('HLT step failed (with status %s) so skip BS filename check', self._rc)
447 elif 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
448 # list of filenames of files matching expectedOutputFileName
449 matchedOutputFileNames = self._findOutputFiles(self.expectedOutputFileName)
450
451
452 # check there are file matches and rename appropriately
453 if len(matchedOutputFileNames) == 0:
454 msg.error('No BS files created with expected name: %s - please check for earlier failures', self.expectedOutputFileName)
455 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
456 f'No BS files created with expected name: {self.expectedOutputFileName} - please check for earlier failures')
457 else:
458 # if only one BS file was created
459 if len(matchedOutputFileNames) == 1:
460 msg.info('Single BS file found: will split (if requested) and rename the file')
461 BSFile = matchedOutputFileNames[0]
462
463 # if more than one file BS was created, then merge them
464 else:
465 msg.info('Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
466 mergedBSFile = matchedOutputFileNames[0].split('._0001.data')[0] + '.mrg._0001.data' # a specific format is required to run file_merging
467
468 mergeFailed = self._mergeBSfiles(matchedOutputFileNames, mergedBSFile)
469 if(mergeFailed):
470 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
471 'Did not produce a merged BS file with file_merging')
472
473 BSFile = 'tmp.BS.mrg'
474 msg.info(f'Renaming temporary merged BS file to {BSFile}')
475 self._renamefile(mergedBSFile, BSFile)
476
477 # First check if we want to produce the COST DRAW output
478 if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
479 splitFailed = self._splitBSfile(['CostMonitoring'], BSFile, self.conf.dataDictionary['DRAW_TRIGCOST'].value[0])
480 if(splitFailed):
481 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
482 'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
483
484 # Run debug step for all streams
485 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
486 self._postExecuteDebug(BSFile)
487
488 # Rename BS file if requested
489 if 'BS' in self.conf.dataDictionary:
490 argInDict = self.conf.dataDictionary['BS']
491 # If a stream (not All) is selected, then slim the orignal (many stream) BS output to the particular stream
492 if 'streamSelection' in self.conf.argdict and self.conf.argdict['streamSelection'].value[0] != "All":
493 splitEmpty = self._splitBSfile(self.conf.argdict['streamSelection'].value, BSFile, argInDict.value[0])
494 if(splitEmpty):
495 msg.info('Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
496 #If splitEmpty==1, the chosen streams contained no events
497 #then run the command to produce an empty BS file and rename it to RAW.pool.root
498 #this stops non-zero exit code for rejected events
499 cmd_splitFailed = 'trigbs_failedStreamSelection.py ' + BSFile
500 msg.info('running command for creating empty file: %s', cmd_splitFailed)
501 subprocess.call(cmd_splitFailed, shell=True)
502 #Rename the empty file to "RAW.pool.root" to prevent failure
503 #expected filename will be of form: T0debug.runnumber.unknown_debug.unknown.RAW._lb0000._TRF._0001.data
504 runnumber = eformat.EventStorage.pickDataReader(BSFile).runNumber()
505 expectedOutputFileName = 'T0debug.00'+str(runnumber)+'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
506 #rename the file to RAW.pool.root, this file will contain 0 events
507 self._renamefile(expectedOutputFileName, argInDict.value[0])
508 else:
509 msg.info('Stream "All" requested, so not splitting BS file')
510 self._renamefile(BSFile, argInDict.value[0])
511 else:
512 msg.info('BS output filetype not defined so skip renaming BS')
513 else:
514 msg.info('BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
515
516 msg.info('Now run athenaExecutor:postExecute')
517 super(trigRecoExecutor, self).postExecute()
518
519 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
520 # Do debug stream postRun step for BS file that contains events after the streamSelection
521 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
522 dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=True)
523
524
525 def _postExecuteDebug(self, outputBSFile):
526 # Run postRun step debug stream analysis if output BS file and output histogram are set
527 msg.info("debug stream analysis in postExecute")
528
529 # Set file name for debug stream analysis output
530 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
531 msg.info('outputHIST_DEBUGSTREAMMONFile argument is {0}'.format(fileNameDbg))
532
533 if(os.path.isfile(fileNameDbg[0])):
534 # Keep filename if not defined
535 msg.info('Will use file created in PreRun step {0}'.format(fileNameDbg))
536 else:
537 msg.info('No file created in PreRun step {0}'.format(fileNameDbg))
538
539 # Do debug stream postRun step
540 dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)
541
if(pathvar)
Class holding the update to an environment that will be passed on to an executor.
Definition trfEnv.py:15
Base class for execution exceptions.
_splitBSfile(self, streamsList, allStreamsFileName, splitFileName)
_renamefile(self, currentFileName, newFileName)
_postExecuteDebug(self, outputBSFile)
preExecute(self, input=set(), output=set())
_mergeBSfiles(self, inputFiles, outputFile)
STL class.
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179
Transform execution functions.
Module for transform exit codes.
Transform utility functions.