ATLAS Offline Software
Public Member Functions | Private Member Functions | Private Attributes | List of all members
python.trigRecoExe.trigRecoExecutor Class Reference
Inheritance diagram for python.trigRecoExe.trigRecoExecutor:
Collaboration diagram for python.trigRecoExe.trigRecoExecutor:

Public Member Functions

def preExecute (self, input=set(), output=set())
 
def postExecute (self)
 

Private Member Functions

def _prepAthenaCommandLine (self)
 
def _findOutputFiles (self, pattern)
 
def _mergeBSfiles (self, inputFiles, outputFile)
 
def _splitBSfile (self, streamsList, allStreamsFileName, splitFileName)
 
def _renamefile (self, currentFileName, newFileName)
 
def _postExecuteDebug (self, outputBSFile)
 

Private Attributes

 _topOptionsFiles
 Write the skeleton file and prep athena. More...
 
 _envUpdate
 Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then. More...
 
 _rc
 

Detailed Description

Definition at line 30 of file trigRecoExe.py.

Member Function Documentation

◆ _findOutputFiles()

def python.trigRecoExe.trigRecoExecutor._findOutputFiles (   self,
  pattern 
)
private

Definition at line 234 of file trigRecoExe.py.

234  def _findOutputFiles(self, pattern):
235  # list to store the filenames of files matching pattern
236  matchedOutputFileNames = []
237  # list of input files that could be in the same folder and need ignoring
238  ignoreInputFileNames = []
239  for dataType, dataArg in self.conf.dataDictionary.items():
240  if dataArg.io == 'input':
241  ignoreInputFileNames.append(dataArg.value[0])
242  # loop over all files in folder to find matching output files
243  for file in os.listdir('.'):
244  if fnmatch.fnmatch(file, pattern):
245  if file in ignoreInputFileNames:
246  msg.info('Ignoring input file: %s', file)
247  else:
248  matchedOutputFileNames.append(file)
249  return matchedOutputFileNames
250 

◆ _mergeBSfiles()

def python.trigRecoExe.trigRecoExecutor._mergeBSfiles (   self,
  inputFiles,
  outputFile 
)
private

Definition at line 252 of file trigRecoExe.py.

252  def _mergeBSfiles(self, inputFiles, outputFile):
253  msg.info(f'Merging multiple BS files ({inputFiles}) into {outputFile}')
254  # Write the list of input files to a text file, to use it as a input for file_merging
255  mergeBSFileList = 'RAWFileMerge.list'
256  try:
257  with open(mergeBSFileList, 'w') as BSFileList:
258  for fname in inputFiles:
259  BSFileList.write(f'{fname}\n')
260  except OSError as e:
261  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
262  f'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
263 
264  # The user should never need to use this directly, but check it just in case...
265  if not outputFile.endswith('._0001.data'):
266  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
267  'Output merged BS filename must end in "._0001.data"')
268  else:
269  # We need to remove the suffix to pass the output as an argument to file_merging
270  outputFile = outputFile.split('._0001.data')[0]
271 
272  mergeBSFailure = 0
273  try:
274  cmd = f'file_merging {mergeBSFileList} 0 {outputFile}'
275  msg.info('running command for merging (in original asetup env): %s', cmd)
276  mergeBSFailure = subprocess.call(cmd, shell=True)
277  msg.debug('file_merging return code %s', mergeBSFailure)
278  except OSError as e:
279  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
280  f'Exception raised when merging BS files using file_merging: {e}')
281  if mergeBSFailure != 0:
282  msg.error('file_merging returned error (%s) no merged BS file created', mergeBSFailure)
283  return 1
284  return 0
285 
286 

◆ _postExecuteDebug()

def python.trigRecoExe.trigRecoExecutor._postExecuteDebug (   self,
  outputBSFile 
)
private

Definition at line 522 of file trigRecoExe.py.

522  def _postExecuteDebug(self, outputBSFile):
523  # Run postRun step debug stream analysis if output BS file and output histogram are set
524  msg.info("debug stream analysis in postExecute")
525 
526  # Set file name for debug stream analysis output
527  fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
528  msg.info('outputHIST_DEBUGSTREAMMONFile argument is {0}'.format(fileNameDbg))
529 
530  if(os.path.isfile(fileNameDbg[0])):
531  # Keep filename if not defined
532  msg.info('Will use file created in PreRun step {0}'.format(fileNameDbg))
533  else:
534  msg.info('No file created in PreRun step {0}'.format(fileNameDbg))
535 
536  # Do debug stream postRun step
537  dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)
538 

◆ _prepAthenaCommandLine()

def python.trigRecoExe.trigRecoExecutor._prepAthenaCommandLine (   self)
private

Definition at line 202 of file trigRecoExe.py.

202  def _prepAthenaCommandLine(self):
203 
204  # When running from the DB athenaHLT needs no skeleton file
205  msg.info("Before build command line check if reading from DB")
206 
207  # Check if expecting to run from DB
208  removeSkeleton = False
209  if 'useDB' in self.conf.argdict:
210  removeSkeleton = True
211  elif 'athenaopts' in self.conf.argdict:
212  v = self.conf.argdict['athenaopts'].value
213  if '--use-database' in v or '-b' in v:
214  removeSkeleton = True
215 
216  if removeSkeleton and self._isCAEnabled():
217  msg.error('Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
218  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_ERROR'),
219  'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
220  return 1
221 
222  # Due to athenaExecutor code don't remove skeleton, otherwise lose runargs too
223  # instead remove skeleton from _topOptionsFiles
224  if removeSkeleton and self._skeleton is not None:
225  msg.info('Read from DB: remove skeleton {0} from command'.format(self._skeleton))
226  self._topOptionsFiles.remove(self._skeleton[0])
227  else:
228  msg.info('Not reading from DB: keep skeleton in command')
229 
230  # Now build command line as in athenaExecutor
231  super(trigRecoExecutor, self)._prepAthenaCommandLine()
232 

◆ _renamefile()

def python.trigRecoExe.trigRecoExecutor._renamefile (   self,
  currentFileName,
  newFileName 
)
private

Definition at line 318 of file trigRecoExe.py.

318  def _renamefile(self, currentFileName, newFileName):
319  msg.info('Renaming file from %s to %s', currentFileName, newFileName)
320  try:
321  os.rename(currentFileName, newFileName)
322  except OSError as e:
323  msg.error('Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
324  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
325  'Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
326 

◆ _splitBSfile()

def python.trigRecoExe.trigRecoExecutor._splitBSfile (   self,
  streamsList,
  allStreamsFileName,
  splitFileName 
)
private

Definition at line 289 of file trigRecoExe.py.

289  def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
290  # merge list of streams
291  outputStreams = ','.join(str(stream) for stream in streamsList)
292  msg.info('Splitting stream %s from BS file', outputStreams)
293  splitStreamFailure = 0
294  try:
295  cmd = 'trigbs_extractStream.py -s ' + outputStreams + ' ' + allStreamsFileName
296  msg.info('running command for splitting (in original asetup env): %s', cmd)
297  splitStreamFailure = subprocess.call(cmd, shell=True)
298  msg.debug('trigbs_extractStream.py splitting return code %s', splitStreamFailure)
299  except OSError as e:
300  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
301  'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.format(allStreamsFileName, e))
302  if splitStreamFailure != 0:
303  msg.warning('trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
304  return 1
305  else:
306  # know that the format will be of the form ####._athenaHLT.####.data
307  expectedStreamFileName = '*_athenaHLT*.data'
308  # list of filenames of files matching expectedStreamFileName
309  matchedOutputFileName = self._findOutputFiles(expectedStreamFileName)
310  if(len(matchedOutputFileName)):
311  self._renamefile(matchedOutputFileName[0], splitFileName)
312  return 0
313  else:
314  msg.error('trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
315  return 1
316 

◆ postExecute()

def python.trigRecoExe.trigRecoExecutor.postExecute (   self)

Definition at line 327 of file trigRecoExe.py.

327  def postExecute(self):
328 
329  # Adding check for HLTMPPU.*Child Issue in the log file
330  # - Throws an error message if there so we catch that the child died
331  # - Also sets the return code of the mother process to mark the job as failed
332  # - Is based on trfValidation.scanLogFile
333  log = self._logFileName
334  msg.debug('Now scanning logfile {0} for HLTMPPU Child Issues'.format(log))
335  # Using the generator so that lines can be grabbed by subroutines if needed for more reporting
336 
337  #Count the number of rejected events
338  rejected = 0
339  #Count the number of accepted events
340  accepted = 0
341 
342  try:
343  myGen = lineByLine(log, substepName=self._substep)
344  except IOError as e:
345  msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
346  for line, lineCounter in myGen:
347  # Check to see if any of the hlt children had an issue
348  if 'Child Issue' in line:
349  try:
350  signal = int((re.search('signal ([0-9]*)', line)).group(1))
351  except AttributeError:
352  # signal not found in message, so return 1 to highlight failure
353  signal = 1
354  msg.error('Detected issue with HLTChild, setting mother return code to %s', signal)
355  self._rc = signal
356 
357  # Merge child log files into parent log file
358  # is needed to make sure all child log files are scanned
359  # files are found by searching whole folder rather than relying on nprocs being defined
360  try:
361  # open original log file (log.BSRDOtoRAW) to merge child files into
362  with open(self._logFileName, 'a') as merged_file:
363  for file in os.listdir('.'):
364  # expected child log files should be of the format athenaHLT:XX.out and .err
365  if fnmatch.fnmatch(file, 'athenaHLT:*'):
366  msg.info('Merging child log file (%s) into %s', file, self._logFileName)
367  with open(file) as log_file:
368  # write header infomation ### Output from athenaHLT:XX.out/err ###
369  merged_file.write('### Output from {} ###\n'.format(file))
370  # write out file line by line
371  for line in log_file:
372  merged_file.write(line)
373  # Check for rejected events in log file
374  if 'rejected:' in line and int(line[14]) != 0:
375  #Add the number of rejected events
376  rejected += int(line[14:])
377  # Check for accepted events in log file
378  if 'accepted:' in line and int(line[14]) != 0:
379  #Add the number of accepted events
380  accepted += int(line[14:])
381  if re.search('DFDcmEmuSession.* Communication error', line) or re.search('DFDcmEmuSession.* No new event provided within the timeout limit', line):
382  msg.error('Caught DFDcmEmuSession error, aborting job')
383  self._rc = 1
384 
385  if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
386  # Add the HLT_accepted_events and HLT_rejected_events histograms to the output file
387  dbgStream.getHltDecision(accepted, rejected, self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value[0])
388 
389  except OSError as e:
390  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
391  'Exception raised when merging log files into {0}: {1}'.format(self._logFileName, e))
392 
393  msg.info("Check for expert-monitoring.root file")
394  # the BS-BS step generates the files:
395  # - expert-monitoring.root (from mother process)
396  # - athenaHLT_workers/*/expert-monitoring.root (from child processes)
397  # to save on panda it needs to be renamed via the outputHIST_HLTMONFile argument
398  expectedFileName = 'expert-monitoring.root'
399 
400  # first check athenaHLT step actually completed
401  if self._rc != 0:
402  msg.info('HLT step failed (with status %s) so skip HIST_HLTMON filename check', self._rc)
403  # next check argument is in dictionary as a requested output
404  elif 'outputHIST_HLTMONFile' in self.conf.argdict:
405 
406  # rename the mother file
407  expectedMotherFileName = 'expert-monitoring-mother.root'
408  if(os.path.isfile(expectedFileName)):
409  msg.info('Renaming %s to %s', expectedFileName, expectedMotherFileName)
410  try:
411  os.rename(expectedFileName, expectedMotherFileName)
412  except OSError as e:
413  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
414  'Exception raised when renaming {0} to {1}: {2}'.format(expectedFileName, expectedMotherFileName, e))
415  else:
416  msg.error('HLTMON argument defined but mother %s not created', expectedFileName)
417 
418  # merge worker files
419  expectedWorkerFileName = 'athenaHLT_workers/athenaHLT-01/' + expectedFileName
420  if(os.path.isfile(expectedWorkerFileName) and os.path.isfile(expectedMotherFileName)):
421  msg.info('Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0])
422  try:
423  # have checked that at least one worker file exists
424  cmd = 'hadd ' + self.conf.argdict['outputHIST_HLTMONFile'].value[0] + ' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
425  subprocess.call(cmd, shell=True)
426  except OSError as e:
427  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
428  'Exception raised when merging worker and mother {0} files to {1}: {2}'.format(expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0], e))
429  else:
430  msg.error('HLTMON argument defined %s but worker %s not created', self.conf.argdict['outputHIST_HLTMONFile'].value[0], expectedFileName)
431 
432  else:
433  msg.info('HLTMON argument not defined so skip %s check', expectedFileName)
434 
435  msg.info("Search for created BS files, and rename if single file found")
436  # The following is needed to handle the BS file being written with a different name (or names)
437  # base is from either the tmp value created by the transform or the value entered by the user
438 
439  argInDict = {}
440  if self._rc != 0:
441  msg.error('HLT step failed (with status %s) so skip BS filename check', self._rc)
442  elif 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
443  # expected string based on knowing that the format will be of form: ####._HLTMPPy_####.data
444  expectedOutputFileName = '*_HLTMPPy_*.data'
445  # list of filenames of files matching expectedOutputFileName
446  matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
447 
448 
449  # check there are file matches and rename appropriately
450  if len(matchedOutputFileNames) == 0:
451  msg.error('No BS files created with expected name: %s - please check for earlier failures', expectedOutputFileName)
452  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
453  f'No BS files created with expected name: {expectedOutputFileName} - please check for earlier failures')
454  else:
455  # if only one BS file was created
456  if len(matchedOutputFileNames) == 1:
457  msg.info('Single BS file found: will split (if requested) and rename the file')
458  BSFile = matchedOutputFileNames[0]
459 
460  # if more than one file BS was created, then merge them
461  else:
462  msg.info('Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
463  mergedBSFile = matchedOutputFileNames[0].split('._0001.data')[0] + '.mrg._0001.data' # a specific format is required to run file_merging
464 
465  mergeFailed = self._mergeBSfiles(matchedOutputFileNames, mergedBSFile)
466  if(mergeFailed):
467  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
468  'Did not produce a merged BS file with file_merging')
469 
470  BSFile = 'tmp.BS.mrg'
471  msg.info(f'Renaming temporary merged BS file to {BSFile}')
472  self._renamefile(mergedBSFile, BSFile)
473 
474  # First check if we want to produce the COST DRAW output
475  if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
476  splitFailed = self._splitBSfile(['CostMonitoring'], BSFile, self.conf.dataDictionary['DRAW_TRIGCOST'].value[0])
477  if(splitFailed):
478  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
479  'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
480 
481  # Run debug step for all streams
482  if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
483  self._postExecuteDebug(BSFile)
484 
485  # Rename BS file if requested
486  if 'BS' in self.conf.dataDictionary:
487  argInDict = self.conf.dataDictionary['BS']
488  # If a stream (not All) is selected, then slim the orignal (many stream) BS output to the particular stream
489  if 'streamSelection' in self.conf.argdict and self.conf.argdict['streamSelection'].value[0] != "All":
490  splitEmpty = self._splitBSfile(self.conf.argdict['streamSelection'].value, BSFile, argInDict.value[0])
491  if(splitEmpty):
492  msg.info('Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
493  #If splitEmpty==1, the chosen streams contained no events
494  #then run the command to produce an empty BS file and rename it to RAW.pool.root
495  #this stops non-zero exit code for rejected events
496  cmd_splitFailed = 'trigbs_failedStreamSelection.py ' + BSFile
497  msg.info('running command for creating empty file: %s', cmd_splitFailed)
498  subprocess.call(cmd_splitFailed, shell=True)
499  #Rename the empty file to "RAW.pool.root" to prevent failure
500  #expected filename will be of form: T0debug.runnumber.unknown_debug.unknown.RAW._lb0000._TRF._0001.data
501  runnumber = eformat.EventStorage.pickDataReader(BSFile).runNumber()
502  expectedOutputFileName = 'T0debug.00'+str(runnumber)+'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
503  #rename the file to RAW.pool.root, this file will contain 0 events
504  self._renamefile(expectedOutputFileName, argInDict.value[0])
505  else:
506  msg.info('Stream "All" requested, so not splitting BS file')
507  self._renamefile(BSFile, argInDict.value[0])
508  else:
509  msg.info('BS output filetype not defined so skip renaming BS')
510  else:
511  msg.info('BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
512 
513  msg.info('Now run athenaExecutor:postExecute')
514  super(trigRecoExecutor, self).postExecute()
515 
516  if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
517  # Do debug stream postRun step for BS file that contains events after the streamSelection
518  fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
519  dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=True)
520 
521 

◆ preExecute()

def python.trigRecoExe.trigRecoExecutor.preExecute (   self,
  input = set(),
  output = set() 
)

Definition at line 36 of file trigRecoExe.py.

36  def preExecute(self, input = set(), output = set()):
37  msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
38 
39  # Check we actually have events to process!
40  if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
41  self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
42  msg.debug('Will test for events to process')
43  for dataType in input:
44  inputEvents = self.conf.dataDictionary[dataType].nentries
45  msg.debug('Got {} events for {}'.format(inputEvents, dataType))
46  if not isinstance(inputEvents, int):
47  msg.warning('Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
48  elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
49  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
50  'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
51 
52 
53  if self._skeleton is not None:
54  inputFiles = dict()
55  for dataType in input:
56  inputFiles[dataType] = self.conf.dataDictionary[dataType]
57  outputFiles = dict()
58  for dataType in output:
59  outputFiles[dataType] = self.conf.dataDictionary[dataType]
60 
61  # See if we have any 'extra' file arguments
62  for dataType, dataArg in self.conf.dataDictionary.items():
63  if dataArg.io == 'input' and self._name in dataArg.executor:
64  inputFiles[dataArg.subtype] = dataArg
65 
66  msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
67 
68  # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
69  self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
70  output = outputFiles)
71 
72 
74  if len(input) > 0:
75  self._extraMetadata['inputs'] = list(input)
76  if len(output) > 0:
77  self._extraMetadata['outputs'] = list(output)
78 
79 
80  asetupString = None
81  if 'asetup' in self.conf.argdict:
82  asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
83  else:
84  msg.info('Asetup report: {0}'.format(asetupReport()))
85 
86  # If legacy release, bring up centos7 container
87  OSSetupString = None
88  if asetupString is not None:
89  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
90  currentOS = os.environ['ALRB_USER_PLATFORM']
91  if legacyOSRelease and "centos7" not in currentOS:
92  OSSetupString = "centos7"
93  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
94 
95  # allow overriding the container OS using a flag
96  if 'runInContainer' in self.conf.argdict:
97  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
98  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
99 
100 
101  dbrelease = dbsetup = None
102  if 'DBRelease' in self.conf.argdict:
103  dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
104  if dbrelease:
105  # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
106  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
107  if dbdMatch:
108  msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
109  if not os.access(dbrelease, os.R_OK):
110  msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
111  msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
112  dbrelease = dbdMatch.group(1)
113  dbsetup = cvmfsDBReleaseCheck(dbrelease)
114  else:
115  # Check if the DBRelease is setup
116  unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
117  if unpacked:
118  # Now run the setup.py script to customise the paths to the current location...
119  setupDBRelease(dbsetup)
120  # For cvmfs we want just the X.Y.Z release string (and also support 'current')
121  else:
122  dbsetup = cvmfsDBReleaseCheck(dbrelease)
123 
124  # Look for environment updates and perpare the athena command line
125  self._envUpdate = trfEnv.environmentUpdate()
126  # above is needed by _prepAthenaCommandLine, but remove the setStandardEnvironment so doesn't include imf or tcmalloc
127  # self._envUpdate.setStandardEnvironment(self.conf.argdict)
128  self._prepAthenaCommandLine()
129 
130  # to get athenaHLT to read in the relevant parts from the runargs file we have to translate them
131  if 'athenaHLT' in self._exe:
132  self._cmd.remove('runargs.BSRDOtoRAW.py')
133  # get list of translated arguments to be used by athenaHLT
134  optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
135  self._cmd.extend(optionList)
136  # updates for CA
137  if self._isCAEnabled():
138  msg.info("Running in CA mode")
139  # we don't use the runargs file with athenaHLT so add the JO and preExecs to the command line
140  self._cmd.append(self._skeletonCA)
141  if 'preExec' in self.conf.argdict:
142  self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
143  msg.info('Command adjusted for CA to %s', self._cmd)
144  else:
145  msg.info("Running in legacy mode")
146 
147  # Run preRun step debug stream analysis if output histogram are set
148  if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
149  # Do debug stream preRun step and get asetup string from debug stream input files
150  dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
151  # Setup asetup from debug stream
152  # if no --asetup r2b:string was given and is not running with tzero/software/patches as TestArea
153  if asetupString is None and dbgAsetupString is not None:
154  asetupString = dbgAsetupString
155  msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
156  # If legacy release, bring up centos7 container
157  legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
158  currentOS = os.environ['ALRB_USER_PLATFORM']
159  if legacyOSRelease and "centos7" not in currentOS:
160  OSSetupString = "centos7"
161  msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
162  # allow overriding the container OS using a flag
163  if 'runInContainer' in self.conf.argdict:
164  OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
165  msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
166 
167  # Set database in command line if it was missing
168  if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
169  msg.warn("Database alias will be set to %s", dbAlias)
170  self._cmd.append("--db-server " + dbAlias)
171  else:
172  msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
173 
174  # The following is needed to avoid conflicts in finding BS files prduced by running the HLT step
175  # and those already existing in the working directory
176  if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
177  # expected string based on knowing that the format will be of form: ####._HLTMPPy_####.data
178  expectedOutputFileName = '*_HLTMPPy_*.data'
179  # list of filenames of files matching expectedOutputFileName
180  matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
181  # check there are no file matches
182  if len(matchedOutputFileNames) > 0:
183  msg.error(f'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
184  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
185  f'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
186 
187  # Sanity check:
188  if OSSetupString is not None and asetupString is None:
189  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
190  'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
191 
192  # Call athenaExecutor parent as the above overrides what athenaExecutor would have done
193  super(athenaExecutor, self).preExecute(input, output)
194 
195  # Now we always write a wrapper, because it's very convenient for re-running individual substeps
196  # This will have asetup and/or DB release setups in it
197  # Do this last in this preExecute as the _cmd needs to be finalised
198  msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
199  self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
200  msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
201 

Member Data Documentation

◆ _envUpdate

python.trigRecoExe.trigRecoExecutor._envUpdate
private

Add input/output file information - this can't be done in init as we don't know what our inputs and outputs will be then.

Do we need to run asetup first? DBRelease configuration

Definition at line 125 of file trigRecoExe.py.

◆ _rc

python.trigRecoExe.trigRecoExecutor._rc
private

Definition at line 355 of file trigRecoExe.py.

◆ _topOptionsFiles

python.trigRecoExe.trigRecoExecutor._topOptionsFiles
private

Write the skeleton file and prep athena.

Definition at line 69 of file trigRecoExe.py.


The documentation for this class was generated from the following file:
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfUtils.lineByLine
def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None)
Generator to return lines and line count from a file.
Definition: trfUtils.py:372
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
PixelModuleFeMask_create_db.remove
string remove
Definition: PixelModuleFeMask_create_db.py:83
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
Trk::open
@ open
Definition: BinningType.h:40
DeMoAtlasDataLoss.runNumber
string runNumber
Definition: DeMoAtlasDataLoss.py:64
ActsTrk::detail::MakeDerivedVariant::extend
constexpr std::variant< Args..., T > extend(const std::variant< Args... > &, const T &)
Definition: MakeDerivedVariant.h:17
CaloLCW_tf.group
group
Definition: CaloLCW_tf.py:28
python.CaloAddPedShiftConfig.int
int
Definition: CaloAddPedShiftConfig.py:45
if
if(febId1==febId2)
Definition: LArRodBlockPhysicsV0.cxx:567
str
Definition: BTagTrackIpAccessor.cxx:11
python.trigTranslate.getTranslated
def getTranslated(runArgs, name, substep, first, output)
Definition: trigTranslate.py:101
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223