ATLAS Offline Software
trigRecoExe.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4 
5 # @brief: Trigger executor to call base transforms
6 # @details: Based on athenaExecutor with some modifications
7 # @author: Mark Stockton
8 
9 import os
10 import fnmatch
11 import re
12 import subprocess
13 import six
14 
15 from PyJobTransforms.trfExe import athenaExecutor
16 
17 # imports for preExecute
18 from PyJobTransforms.trfUtils import asetupReport, cvmfsDBReleaseCheck, unpackDBRelease, setupDBRelease, lineByLine, asetupReleaseIsOlderThan
19 import PyJobTransforms.trfEnv as trfEnv
20 import PyJobTransforms.trfExceptions as trfExceptions
21 from PyJobTransforms.trfExitCodes import trfExit as trfExit
22 import TrigTransform.dbgAnalysis as dbgStream
23 from TrigTransform.trigTranslate import getTranslated as getTranslated
24 
25 # Setup logging here
26 import logging, eformat
27 msg = logging.getLogger("PyJobTransforms." + __name__)
28 
29 # Trig_reco_tf.py executor for BS-BS step (aka running the trigger)
30 # used to setup input files/arguments and change output filenames
31 class trigRecoExecutor(athenaExecutor):
32 
33  # preExecute is based on athenaExecutor but with key changes:
34  # - removed athenaMP detection
35  # - removed environment so does not require the noimf notcmalloc flags
36  # - added swap of argument name for runargs file so that athenaHLT reads it in
37  def preExecute(self, input = set(), output = set()):
38  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
39 
40  # Check we actually have events to process!
41  if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
42  self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
43  msg.debug('Will test for events to process')
44  for dataType in input:
45  inputEvents = self.conf.dataDictionary[dataType].nentries
46  msg.debug('Got {0} events for {1}'.format(inputEvents, dataType))
47  if not isinstance(inputEvents, six.integer_types):
48  msg.warning('Are input events countable? Got nevents={0} so disabling event count check for this input'.format(inputEvents))
49  elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
50  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
51  'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
52 
53 
54  if self._skeleton is not None:
55  inputFiles = dict()
56  for dataType in input:
57  inputFiles[dataType] = self.conf.dataDictionary[dataType]
58  outputFiles = dict()
59  for dataType in output:
60  outputFiles[dataType] = self.conf.dataDictionary[dataType]
61 
62  # See if we have any 'extra' file arguments
63  for dataType, dataArg in self.conf.dataDictionary.items():
64  if dataArg.io == 'input' and self._name in dataArg.executor:
65  inputFiles[dataArg.subtype] = dataArg
66 
67  msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
68 
69  # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
70  self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
71  output = outputFiles)
72 
73 
75  if len(input) > 0:
76  self._extraMetadata['inputs'] = list(input)
77  if len(output) > 0:
78  self._extraMetadata['outputs'] = list(output)
79 
80 
81  asetupString = None
82  if 'asetup' in self.conf.argdict:
83  asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
84  else:
85  msg.info('Asetup report: {0}'.format(asetupReport()))
86 
87  # If legacy release, bring up centos7 container
88  OSSetupString = None
89  if asetupString is not None:
90  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
91  currentOS = os.environ['ALRB_USER_PLATFORM']
92  if legacyOSRelease and "centos7" not in currentOS:
93  OSSetupString = "centos7"
94  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
95 
96  # allow overriding the container OS using a flag
97  if 'runInContainer' in self.conf.argdict:
98  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
99  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
100 
101 
102  dbrelease = dbsetup = None
103  if 'DBRelease' in self.conf.argdict:
104  dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
105  if dbrelease:
106  # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
107  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
108  if dbdMatch:
109  msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
110  if not os.access(dbrelease, os.R_OK):
111  msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
112  msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
113  dbrelease = dbdMatch.group(1)
114  dbsetup = cvmfsDBReleaseCheck(dbrelease)
115  else:
116  # Check if the DBRelease is setup
117  unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
118  if unpacked:
119  # Now run the setup.py script to customise the paths to the current location...
120  setupDBRelease(dbsetup)
121  # For cvmfs we want just the X.Y.Z release string (and also support 'current')
122  else:
123  dbsetup = cvmfsDBReleaseCheck(dbrelease)
124 
125  # Look for environment updates and perpare the athena command line
127  # above is needed by _prepAthenaCommandLine, but remove the setStandardEnvironment so doesn't include imf or tcmalloc
128  # self._envUpdate.setStandardEnvironment(self.conf.argdict)
130 
131  # to get athenaHLT to read in the relevant parts from the runargs file we have to translate them
132  if 'athenaHLT' in self._exe:
133  self._cmd.remove('runargs.BSRDOtoRAW.py')
134  # get list of translated arguments to be used by athenaHLT
135  optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
136  self._cmd.extend(optionList)
137  # updates for CA
138  if self._isCAEnabled():
139  msg.info("Running in CA mode")
140  # we don't use the runargs file with athenaHLT so add the JO and preExecs to the command line
141  self._cmd.append(self._skeletonCA)
142  if 'preExec' in self.conf.argdict:
143  self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
144  msg.info('Command adjusted for CA to %s', self._cmd)
145  else:
146  msg.info("Running in legacy mode")
147 
148  # Run preRun step debug stream analysis if output histogram are set
149  if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
150  # Do debug stream preRun step and get asetup string from debug stream input files
151  dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
152  # Setup asetup from debug stream
153  # if no --asetup r2b:string was given and is not running with tzero/software/patches as TestArea
154  if asetupString is None and dbgAsetupString is not None:
155  asetupString = dbgAsetupString
156  msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
157  # If legacy release, bring up centos7 container
158  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
159  currentOS = os.environ['ALRB_USER_PLATFORM']
160  if legacyOSRelease and "centos7" not in currentOS:
161  OSSetupString = "centos7"
162  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
163  # allow overriding the container OS using a flag
164  if 'runInContainer' in self.conf.argdict:
165  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
166  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
167 
168  # Set database in command line if it was missing
169  if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
170  msg.warn("Database alias will be set to %s", dbAlias)
171  self._cmd.append("--db-server " + dbAlias)
172  else:
173  msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
174 
175  # The following is needed to avoid conflicts in finding BS files prduced by running the HLT step
176  # and those already existing in the working directory
177  if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
178  # expected string based on knowing that the format will be of form: ####._HLTMPPy_####.data
179  expectedOutputFileName = '*_HLTMPPy_*.data'
180  # list of filenames of files matching expectedOutputFileName
181  matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
182  # check there are no file matches
183  if len(matchedOutputFileNames) > 0:
184  msg.error(f'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
185  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
186  f'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
187 
188  # Sanity check:
189  if OSSetupString is not None and asetupString is None:
190  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
191  'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
192 
193  # Call athenaExecutor parent as the above overrides what athenaExecutor would have done
194  super(athenaExecutor, self).preExecute(input, output)
195 
196  # Now we always write a wrapper, because it's very convenient for re-running individual substeps
197  # This will have asetup and/or DB release setups in it
198  # Do this last in this preExecute as the _cmd needs to be finalised
199  msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
200  self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
201  msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
202 
204 
205  # When running from the DB athenaHLT needs no skeleton file
206  msg.info("Before build command line check if reading from DB")
207 
208  # Check if expecting to run from DB
209  removeSkeleton = False
210  if 'useDB' in self.conf.argdict:
211  removeSkeleton = True
212  elif 'athenaopts' in self.conf.argdict:
213  v = self.conf.argdict['athenaopts'].value
214  if '--use-database' in v or '-b' in v:
215  removeSkeleton = True
216 
217  if removeSkeleton and self._isCAEnabled():
218  msg.error('Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
219  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_ERROR'),
220  'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
221  return 1
222 
223  # Due to athenaExecutor code don't remove skeleton, otherwise lose runargs too
224  # instead remove skeleton from _topOptionsFiles
225  if removeSkeleton and self._skeleton is not None:
226  msg.info('Read from DB: remove skeleton {0} from command'.format(self._skeleton))
227  self._topOptionsFiles.remove(self._skeleton[0])
228  else:
229  msg.info('Not reading from DB: keep skeleton in command')
230 
231  # Now build command line as in athenaExecutor
232  super(trigRecoExecutor, self)._prepAthenaCommandLine()
233 
234  # Loop over current directory and find the output file matching input pattern
235  def _findOutputFiles(self, pattern):
236  # list to store the filenames of files matching pattern
237  matchedOutputFileNames = []
238  # list of input files that could be in the same folder and need ignoring
239  ignoreInputFileNames = []
240  for dataType, dataArg in self.conf.dataDictionary.items():
241  if dataArg.io == 'input':
242  ignoreInputFileNames.append(dataArg.value[0])
243  # loop over all files in folder to find matching output files
244  for file in os.listdir('.'):
245  if fnmatch.fnmatch(file, pattern):
246  if file in ignoreInputFileNames:
247  msg.info('Ignoring input file: %s', file)
248  else:
249  matchedOutputFileNames.append(file)
250  return matchedOutputFileNames
251 
252  # merge multiple BS files into a single BS file
253  def _mergeBSfiles(self, inputFiles, outputFile):
254  msg.info(f'Merging multiple BS files ({inputFiles}) into {outputFile}')
255  # Write the list of input files to a text file, to use it as a input for file_merging
256  mergeBSFileList = 'RAWFileMerge.list'
257  try:
258  with open(mergeBSFileList, 'w') as BSFileList:
259  for fname in inputFiles:
260  BSFileList.write(f'{fname}\n')
261  except OSError as e:
262  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
263  f'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
264 
265  # The user should never need to use this directly, but check it just in case...
266  if not outputFile.endswith('._0001.data'):
267  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
268  'Output merged BS filename must end in "._0001.data"')
269  else:
270  # We need to remove the suffix to pass the output as an argument to file_merging
271  outputFile = outputFile.split('._0001.data')[0]
272 
273  mergeBSFailure = 0
274  try:
275  cmd = f'file_merging {mergeBSFileList} 0 {outputFile}'
276  msg.info('running command for merging (in original asetup env): %s', cmd)
277  mergeBSFailure = subprocess.call(cmd, shell=True)
278  msg.debug('file_merging return code %s', mergeBSFailure)
279  except OSError as e:
280  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
281  f'Exception raised when merging BS files using file_merging: {e}')
282  if mergeBSFailure != 0:
283  msg.error('file_merging returned error (%s) no merged BS file created', mergeBSFailure)
284  return 1
285  return 0
286 
287 
288  # run trigbs_extractStream.py to split a stream out of the BS file
289  # renames the split file afterwards
290  def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
291  # merge list of streams
292  outputStreams = ','.join(str(stream) for stream in streamsList)
293  msg.info('Splitting stream %s from BS file', outputStreams)
294  splitStreamFailure = 0
295  try:
296  cmd = 'trigbs_extractStream.py -s ' + outputStreams + ' ' + allStreamsFileName
297  msg.info('running command for splitting (in original asetup env): %s', cmd)
298  splitStreamFailure = subprocess.call(cmd, shell=True)
299  msg.debug('trigbs_extractStream.py splitting return code %s', splitStreamFailure)
300  except OSError as e:
301  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
302  'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.format(allStreamsFileName, e))
303  if splitStreamFailure != 0:
304  msg.warning('trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
305  return 1
306  else:
307  # know that the format will be of the form ####._athenaHLT.####.data
308  expectedStreamFileName = '*_athenaHLT*.data'
309  # list of filenames of files matching expectedStreamFileName
310  matchedOutputFileName = self._findOutputFiles(expectedStreamFileName)
311  if(len(matchedOutputFileName)):
312  self._renamefile(matchedOutputFileName[0], splitFileName)
313  return 0
314  else:
315  msg.error('trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
316  return 1
317 
318  # rename a created file - used to overwrite filenames from athenaHLT into the requested argument name
319  def _renamefile(self, currentFileName, newFileName):
320  msg.info('Renaming file from %s to %s', currentFileName, newFileName)
321  try:
322  os.rename(currentFileName, newFileName)
323  except OSError as e:
324  msg.error('Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
325  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
326  'Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
327 
328  def postExecute(self):
329 
330  # Adding check for HLTMPPU.*Child Issue in the log file
331  # - Throws an error message if there so we catch that the child died
332  # - Also sets the return code of the mother process to mark the job as failed
333  # - Is based on trfValidation.scanLogFile
334  log = self._logFileName
335  msg.debug('Now scanning logfile {0} for HLTMPPU Child Issues'.format(log))
336  # Using the generator so that lines can be grabbed by subroutines if needed for more reporting
337 
338  #Count the number of rejected events
339  rejected = 0
340  #Count the number of accepted events
341  accepted = 0
342 
343  try:
344  myGen = lineByLine(log, substepName=self._substep)
345  except IOError as e:
346  msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
347  for line, lineCounter in myGen:
348  # Check to see if any of the hlt children had an issue
349  if 'Child Issue' in line:
350  try:
351  signal = int((re.search('signal ([0-9]*)', line)).group(1))
352  except AttributeError:
353  # signal not found in message, so return 1 to highlight failure
354  signal = 1
355  msg.error('Detected issue with HLTChild, setting mother return code to %s', signal)
356  self._rc = signal
357 
358  # Merge child log files into parent log file
359  # is needed to make sure all child log files are scanned
360  # files are found by searching whole folder rather than relying on nprocs being defined
361  try:
362  # open original log file (log.BSRDOtoRAW) to merge child files into
363  with open(self._logFileName, 'a') as merged_file:
364  for file in os.listdir('.'):
365  # expected child log files should be of the format athenaHLT:XX.out and .err
366  if fnmatch.fnmatch(file, 'athenaHLT:*'):
367  msg.info('Merging child log file (%s) into %s', file, self._logFileName)
368  with open(file) as log_file:
369  # write header infomation ### Output from athenaHLT:XX.out/err ###
370  merged_file.write('### Output from {} ###\n'.format(file))
371  # write out file line by line
372  for line in log_file:
373  merged_file.write(line)
374  # Check for rejected events in log file
375  if 'rejected:' in line and int(line[14]) != 0:
376  #Add the number of rejected events
377  rejected += int(line[14:])
378  # Check for accepted events in log file
379  if 'accepted:' in line and int(line[14]) != 0:
380  #Add the number of accepted events
381  accepted += int(line[14:])
382 
383  if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
384  # Add the HLT_accepted_events and HLT_rejected_events histograms to the output file
385  dbgStream.getHltDecision(accepted, rejected, self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value[0])
386 
387  except OSError as e:
388  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
389  'Exception raised when merging log files into {0}: {1}'.format(self._logFileName, e))
390 
391  msg.info("Check for expert-monitoring.root file")
392  # the BS-BS step generates the files:
393  # - expert-monitoring.root (from mother process)
394  # - athenaHLT_workers/*/expert-monitoring.root (from child processes)
395  # to save on panda it needs to be renamed via the outputHIST_HLTMONFile argument
396  expectedFileName = 'expert-monitoring.root'
397 
398  # first check athenaHLT step actually completed
399  if self._rc != 0:
400  msg.info('HLT step failed (with status %s) so skip HIST_HLTMON filename check', self._rc)
401  # next check argument is in dictionary as a requested output
402  elif 'outputHIST_HLTMONFile' in self.conf.argdict:
403 
404  # rename the mother file
405  expectedMotherFileName = 'expert-monitoring-mother.root'
406  if(os.path.isfile(expectedFileName)):
407  msg.info('Renaming %s to %s', expectedFileName, expectedMotherFileName)
408  try:
409  os.rename(expectedFileName, expectedMotherFileName)
410  except OSError as e:
411  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
412  'Exception raised when renaming {0} to {1}: {2}'.format(expectedFileName, expectedMotherFileName, e))
413  else:
414  msg.error('HLTMON argument defined but mother %s not created', expectedFileName)
415 
416  # merge worker files
417  expectedWorkerFileName = 'athenaHLT_workers/athenaHLT-01/' + expectedFileName
418  if(os.path.isfile(expectedWorkerFileName) and os.path.isfile(expectedMotherFileName)):
419  msg.info('Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0])
420  try:
421  # have checked that at least one worker file exists
422  cmd = 'hadd ' + self.conf.argdict['outputHIST_HLTMONFile'].value[0] + ' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
423  subprocess.call(cmd, shell=True)
424  except OSError as e:
425  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
426  'Exception raised when merging worker and mother {0} files to {1}: {2}'.format(expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0], e))
427  else:
428  msg.error('HLTMON argument defined %s but worker %s not created', self.conf.argdict['outputHIST_HLTMONFile'].value[0], expectedFileName)
429 
430  else:
431  msg.info('HLTMON argument not defined so skip %s check', expectedFileName)
432 
433  msg.info("Search for created BS files, and rename if single file found")
434  # The following is needed to handle the BS file being written with a different name (or names)
435  # base is from either the tmp value created by the transform or the value entered by the user
436 
437  argInDict = {}
438  if self._rc != 0:
439  msg.error('HLT step failed (with status %s) so skip BS filename check', self._rc)
440  elif 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
441  # expected string based on knowing that the format will be of form: ####._HLTMPPy_####.data
442  expectedOutputFileName = '*_HLTMPPy_*.data'
443  # list of filenames of files matching expectedOutputFileName
444  matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
445 
446 
447  # check there are file matches and rename appropriately
448  if len(matchedOutputFileNames) == 0:
449  msg.error('No BS files created with expected name: %s - please check for earlier failures', expectedOutputFileName)
450  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
451  f'No BS files created with expected name: {expectedOutputFileName} - please check for earlier failures')
452  else:
453  # if only one BS file was created
454  if len(matchedOutputFileNames) == 1:
455  msg.info('Single BS file found: will split (if requested) and rename the file')
456  BSFile = matchedOutputFileNames[0]
457 
458  # if more than one file BS was created, then merge them
459  else:
460  msg.info('Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
461  mergedBSFile = matchedOutputFileNames[0].split('._0001.data')[0] + '.mrg._0001.data' # a specific format is required to run file_merging
462 
463  mergeFailed = self._mergeBSfiles(matchedOutputFileNames, mergedBSFile)
464  if(mergeFailed):
465  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
466  'Did not produce a merged BS file with file_merging')
467 
468  BSFile = 'tmp.BS.mrg'
469  msg.info(f'Renaming temporary merged BS file to {BSFile}')
470  self._renamefile(mergedBSFile, BSFile)
471 
472  # First check if we want to produce the COST DRAW output
473  if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
474  splitFailed = self._splitBSfile(['CostMonitoring'], BSFile, self.conf.dataDictionary['DRAW_TRIGCOST'].value[0])
475  if(splitFailed):
476  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
477  'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
478 
479  # Run debug step for all streams
480  if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
481  self._postExecuteDebug(BSFile)
482 
483  # Rename BS file if requested
484  if 'BS' in self.conf.dataDictionary:
485  argInDict = self.conf.dataDictionary['BS']
486  # If a stream (not All) is selected, then slim the orignal (many stream) BS output to the particular stream
487  if 'streamSelection' in self.conf.argdict and self.conf.argdict['streamSelection'].value[0] != "All":
488  splitEmpty = self._splitBSfile(self.conf.argdict['streamSelection'].value, BSFile, argInDict.value[0])
489  if(splitEmpty):
490  msg.info('Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
491  #If splitEmpty==1, the chosen streams contained no events
492  #then run the command to produce an empty BS file and rename it to RAW.pool.root
493  #this stops non-zero exit code for rejected events
494  cmd_splitFailed = 'trigbs_failedStreamSelection.py ' + BSFile
495  msg.info('running command for creating empty file: %s', cmd_splitFailed)
496  subprocess.call(cmd_splitFailed, shell=True)
497  #Rename the empty file to "RAW.pool.root" to prevent failure
498  #expected filename will be of form: T0debug.runnumber.unknown_debug.unknown.RAW._lb0000._TRF._0001.data
499  runnumber = eformat.EventStorage.pickDataReader(BSFile).runNumber()
500  expectedOutputFileName = 'T0debug.00'+str(runnumber)+'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
501  #rename the file to RAW.pool.root, this file will contain 0 events
502  self._renamefile(expectedOutputFileName, argInDict.value[0])
503  else:
504  msg.info('Stream "All" requested, so not splitting BS file')
505  self._renamefile(BSFile, argInDict.value[0])
506  else:
507  msg.info('BS output filetype not defined so skip renaming BS')
508  else:
509  msg.info('BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
510 
511  msg.info('Now run athenaExecutor:postExecute')
512  super(trigRecoExecutor, self).postExecute()
513 
514  if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
515  # Do debug stream postRun step for BS file that contains events after the streamSelection
516  fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
517  dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=True)
518 
519 
520  def _postExecuteDebug(self, outputBSFile):
521  # Run postRun step debug stream analysis if output BS file and output histogram are set
522  msg.info("debug stream analysis in postExecute")
523 
524  # Set file name for debug stream analysis output
525  fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
526  msg.info('outputHIST_DEBUGSTREAMMONFile argument is {0}'.format(fileNameDbg))
527 
528  if(os.path.isfile(fileNameDbg[0])):
529  # Keep filename if not defined
530  msg.info('Will use file created in PreRun step {0}'.format(fileNameDbg))
531  else:
532  msg.info('No file created in PreRun step {0}'.format(fileNameDbg))
533 
534  # Do debug stream postRun step
535  dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)
536 
python.trigRecoExe.trigRecoExecutor
Definition: trigRecoExe.py:31
python.trigRecoExe.trigRecoExecutor.preExecute
def preExecute(self, input=set(), output=set())
Definition: trigRecoExe.py:37
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfUtils.lineByLine
def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None)
Generator to return lines and line count from a file.
Definition: trfUtils.py:372
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.trigRecoExe.trigRecoExecutor._rc
_rc
Definition: trigRecoExe.py:356
python.trigRecoExe.trigRecoExecutor._splitBSfile
def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName)
Definition: trigRecoExe.py:290
python.trigRecoExe.trigRecoExecutor._topOptionsFiles
_topOptionsFiles
Write the skeleton file and prep athena.
Definition: trigRecoExe.py:70
PyJobTransforms.trfExitCodes
Module for transform exit codes.
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.trfEnv.environmentUpdate
Class holding the update to an environment that will be passed on to an executor.
Definition: trfEnv.py:15
python.trfExceptions.TransformExecutionException
Base class for execution exceptions.
Definition: trfExceptions.py:62
PixelModuleFeMask_create_db.remove
string remove
Definition: PixelModuleFeMask_create_db.py:83
python.trigRecoExe.trigRecoExecutor._findOutputFiles
def _findOutputFiles(self, pattern)
Definition: trigRecoExe.py:235
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
PyJobTransforms.trfExe
Transform execution functions.
Trk::open
@ open
Definition: BinningType.h:40
DeMoAtlasDataLoss.runNumber
string runNumber
Definition: DeMoAtlasDataLoss.py:64
CaloLCW_tf.group
group
Definition: CaloLCW_tf.py:28
PyJobTransforms.trfUtils
Transform utility functions.
python.trigRecoExe.trigRecoExecutor.postExecute
def postExecute(self)
Definition: trigRecoExe.py:328
python.trigRecoExe.trigRecoExecutor._prepAthenaCommandLine
def _prepAthenaCommandLine(self)
Definition: trigRecoExe.py:203
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:569
python.trigRecoExe.trigRecoExecutor._envUpdate
_envUpdate
Add input/output file information - this can't be done in init as we don't know what our inputs and o...
Definition: trigRecoExe.py:126
python.trigRecoExe.trigRecoExecutor._renamefile
def _renamefile(self, currentFileName, newFileName)
Definition: trigRecoExe.py:319
str
Definition: BTagTrackIpAccessor.cxx:11
python.trigRecoExe.trigRecoExecutor._mergeBSfiles
def _mergeBSfiles(self, inputFiles, outputFile)
Definition: trigRecoExe.py:253
python.trigRecoExe.trigRecoExecutor._postExecuteDebug
def _postExecuteDebug(self, outputBSFile)
Definition: trigRecoExe.py:520
python.trigTranslate.getTranslated
def getTranslated(runArgs, name, substep, first, output)
Definition: trigTranslate.py:92
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223