ATLAS Offline Software
Loading...
Searching...
No Matches
python.trigRecoExe.trigRecoExecutor Class Reference
Inheritance diagram for python.trigRecoExe.trigRecoExecutor:
Collaboration diagram for python.trigRecoExe.trigRecoExecutor:

Public Member Functions

 preExecute (self, input=set(), output=set())
 postExecute (self)

Public Attributes

 name

Static Public Attributes

str expectedOutputFileName = '*SingleStream.daq.RAW._*.data'

Protected Member Functions

 _prepAthenaCommandLine (self)
 _findOutputFiles (self, pattern)
 _mergeBSfiles (self, inputFiles, outputFile)
 _splitBSfile (self, streamsList, allStreamsFileName, splitFileName)
 _renamefile (self, currentFileName, newFileName)
 _postExecuteDebug (self, outputBSFile)

Protected Attributes

str _exe = 'setsid ' + self.conf.argdict['trigExe'].value
 _topOptionsFiles
 _substep
 _envUpdate = trfEnv.environmentUpdate()
 _skeletonCA
 _cmd
 _name
 _skeleton
int _rc = signal
 _logFileName

Detailed Description

Definition at line 30 of file trigRecoExe.py.

Member Function Documentation

◆ _findOutputFiles()

python.trigRecoExe.trigRecoExecutor._findOutputFiles ( self,
pattern )
protected

Definition at line 238 of file trigRecoExe.py.

238 def _findOutputFiles(self, pattern):
239 # list to store the filenames of files matching pattern
240 matchedOutputFileNames = []
241 # list of input files that could be in the same folder and need ignoring
242 ignoreInputFileNames = []
243 for dataType, dataArg in self.conf.dataDictionary.items():
244 if dataArg.io == 'input':
245 ignoreInputFileNames.append(dataArg.value[0])
246 # loop over all files in folder to find matching output files
247 for file in os.listdir('.'):
248 if fnmatch.fnmatch(file, pattern):
249 if file in ignoreInputFileNames:
250 msg.info('Ignoring input file: %s', file)
251 else:
252 matchedOutputFileNames.append(file)
253 return matchedOutputFileNames
254

◆ _mergeBSfiles()

python.trigRecoExe.trigRecoExecutor._mergeBSfiles ( self,
inputFiles,
outputFile )
protected

Definition at line 256 of file trigRecoExe.py.

256 def _mergeBSfiles(self, inputFiles, outputFile):
257 msg.info(f'Merging multiple BS files ({inputFiles}) into {outputFile}')
258 # Write the list of input files to a text file, to use it as a input for file_merging
259 mergeBSFileList = 'RAWFileMerge.list'
260 try:
261 with open(mergeBSFileList, 'w') as BSFileList:
262 for fname in inputFiles:
263 BSFileList.write(f'{fname}\n')
264 except OSError as e:
265 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
266 f'Got an error when writing list of BS files to {mergeBSFileList}: {e}')
267
268 # The user should never need to use this directly, but check it just in case...
269 if not outputFile.endswith('._0001.data'):
270 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
271 'Output merged BS filename must end in "._0001.data"')
272 else:
273 # We need to remove the suffix to pass the output as an argument to file_merging
274 outputFile = outputFile.split('._0001.data')[0]
275
276 mergeBSFailure = 0
277 try:
278 cmd = f'file_merging {mergeBSFileList} 0 {outputFile}'
279 msg.info('running command for merging (in original asetup env): %s', cmd)
280 mergeBSFailure = subprocess.call(cmd, shell=True)
281 msg.debug('file_merging return code %s', mergeBSFailure)
282 except OSError as e:
283 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
284 f'Exception raised when merging BS files using file_merging: {e}')
285 if mergeBSFailure != 0:
286 msg.error('file_merging returned error (%s) no merged BS file created', mergeBSFailure)
287 return 1
288 return 0
289
290

◆ _postExecuteDebug()

python.trigRecoExe.trigRecoExecutor._postExecuteDebug ( self,
outputBSFile )
protected

Definition at line 525 of file trigRecoExe.py.

525 def _postExecuteDebug(self, outputBSFile):
526 # Run postRun step debug stream analysis if output BS file and output histogram are set
527 msg.info("debug stream analysis in postExecute")
528
529 # Set file name for debug stream analysis output
530 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
531 msg.info('outputHIST_DEBUGSTREAMMONFile argument is {0}'.format(fileNameDbg))
532
533 if(os.path.isfile(fileNameDbg[0])):
534 # Keep filename if not defined
535 msg.info('Will use file created in PreRun step {0}'.format(fileNameDbg))
536 else:
537 msg.info('No file created in PreRun step {0}'.format(fileNameDbg))
538
539 # Do debug stream postRun step
540 dbgStream.dbgPostRun(outputBSFile, fileNameDbg[0], self.conf.argdict)
541
if(pathvar)

◆ _prepAthenaCommandLine()

python.trigRecoExe.trigRecoExecutor._prepAthenaCommandLine ( self)
protected

Definition at line 206 of file trigRecoExe.py.

206 def _prepAthenaCommandLine(self):
207
208 # When running from the DB, no skeleton file is needed
209 msg.info("Before build command line check if reading from DB")
210
211 # Check if expecting to run from DB
212 removeSkeleton = False
213 if 'useDB' in self.conf.argdict:
214 removeSkeleton = True
215 elif 'athenaopts' in self.conf.argdict:
216 v = self.conf.argdict['athenaopts'].value
217 if '--use-database' in v or '-b' in v:
218 removeSkeleton = True
219
220 if removeSkeleton and self._isCAEnabled():
221 msg.error('Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
222 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_ARG_ERROR'),
223 'Do not specify --CA when reading from the DB, CA config will already be contained in the SMK')
224 return 1
225
226 # Due to athenaExecutor code don't remove skeleton, otherwise lose runargs too
227 # instead remove skeleton from _topOptionsFiles
228 if removeSkeleton and self._skeleton is not None:
229 msg.info('Read from DB: remove skeleton {0} from command'.format(self._skeleton))
230 self._topOptionsFiles.remove(self._skeleton[0])
231 else:
232 msg.info('Not reading from DB: keep skeleton in command')
233
234 # Now build command line as in athenaExecutor
235 super(trigRecoExecutor, self)._prepAthenaCommandLine()
236

◆ _renamefile()

python.trigRecoExe.trigRecoExecutor._renamefile ( self,
currentFileName,
newFileName )
protected

Definition at line 323 of file trigRecoExe.py.

323 def _renamefile(self, currentFileName, newFileName):
324 msg.info('Renaming file from %s to %s', currentFileName, newFileName)
325 try:
326 os.rename(currentFileName, newFileName)
327 except OSError as e:
328 msg.error('Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
329 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
330 'Exception raised when renaming {0} #to {1}: {2}'.format(currentFileName, newFileName, e))
331

◆ _splitBSfile()

python.trigRecoExe.trigRecoExecutor._splitBSfile ( self,
streamsList,
allStreamsFileName,
splitFileName )
protected

Definition at line 293 of file trigRecoExe.py.

293 def _splitBSfile(self, streamsList, allStreamsFileName, splitFileName):
294 # merge list of streams
295 outputStreams = ','.join(str(stream) for stream in streamsList)
296 msg.info('Splitting stream %s from BS file', outputStreams)
297 splitStreamFailure = 0
298 try:
299 cmd = f'trigbs_extractStream.py -s {outputStreams} {allStreamsFileName}'
300 msg.info('running command for splitting (in original asetup env): %s', cmd)
301 splitStreamFailure = subprocess.call(cmd, shell=True)
302 msg.debug('trigbs_extractStream.py splitting return code %s', splitStreamFailure)
303 except OSError as e:
304 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
305 'Exception raised when selecting stream with trigbs_extractStream.py in file {0}: {1}'.format(allStreamsFileName, e))
306 if splitStreamFailure != 0:
307 msg.warning('trigbs_extractStream.py returned error (%s) no split BS file created', splitStreamFailure)
308 return 1
309 else:
310 # If more than one stream selected, trigbs_extractStream produces an "accepted" stream file
311 streamName = outputStreams if len(streamsList)==1 else 'accepted'
312 expectedStreamFileName = f'*_{streamName}.*.RAW._*.data'
313 # list of filenames of files matching expectedStreamFileName
314 matchedOutputFileName = self._findOutputFiles(expectedStreamFileName)
315 if(len(matchedOutputFileName)):
316 self._renamefile(matchedOutputFileName[0], splitFileName)
317 return 0
318 else:
319 msg.error('trigbs_extractStream.py did not created expected file (%s)', expectedStreamFileName)
320 return 1
321

◆ postExecute()

python.trigRecoExe.trigRecoExecutor.postExecute ( self)

Definition at line 332 of file trigRecoExe.py.

332 def postExecute(self):
333
334 # Adding check for HLTMPPU.*Child Issue in the log file
335 # - Throws an error message if there so we catch that the child died
336 # - Also sets the return code of the mother process to mark the job as failed
337 # - Is based on trfValidation.scanLogFile
338 log = self._logFileName
339 msg.debug('Now scanning logfile {0} for HLTMPPU Child Issues'.format(log))
340 # Using the generator so that lines can be grabbed by subroutines if needed for more reporting
341
342 #Count the number of rejected events
343 rejected = 0
344 #Count the number of accepted events
345 accepted = 0
346
347 try:
348 myGen = lineByLine(log, substepName=self._substep)
349 except IOError as e:
350 msg.error('Failed to open transform logfile {0}: {1:s}'.format(log, e))
351 for line, lineCounter in myGen:
352 # Check to see if any of the hlt children had an issue
353 if 'Child Issue' in line:
354 try:
355 signal = int((re.search('signal ([0-9]*)', line)).group(1))
356 except AttributeError:
357 # signal not found in message, so return 1 to highlight failure
358 signal = 1
359 msg.error('Detected issue with HLTChild, setting mother return code to %s', signal)
360 self._rc = signal
361
362 # Merge child log files into parent log file
363 # is needed to make sure all child log files are scanned
364 # files are found by searching whole folder rather than relying on nprocs being defined
365 try:
366 # open original log file (log.BSRDOtoRAW) to merge child files into
367 with open(self._logFileName, 'a') as merged_file:
368 for file in os.listdir('.'):
369 # expected child log files should be of the format athenaHLT:XX.out and .err
370 if fnmatch.fnmatch(file, 'athenaHLT:*'):
371 msg.info('Merging child log file (%s) into %s', file, self._logFileName)
372 with open(file) as log_file:
373 # write header infomation ### Output from athenaHLT:XX.out/err ###
374 merged_file.write('### Output from {} ###\n'.format(file))
375 # write out file line by line
376 for line in log_file:
377 merged_file.write(line)
378 # Check for rejected events in log file
379 if 'rejected:' in line and int(line[14]) != 0:
380 #Add the number of rejected events
381 rejected += int(line[14:])
382 # Check for accepted events in log file
383 if 'accepted:' in line and int(line[14]) != 0:
384 #Add the number of accepted events
385 accepted += int(line[14:])
386 if re.search('DFDcmEmuSession.* Communication error', line) or re.search('DFDcmEmuSession.* No new event provided within the timeout limit', line):
387 msg.error('Caught DFDcmEmuSession error, aborting job')
388 self._rc = 1
389
390 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
391 # Add the HLT_accepted_events and HLT_rejected_events histograms to the output file
392 dbgStream.getHltDecision(accepted, rejected, self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value[0])
393
394 except OSError as e:
395 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
396 'Exception raised when merging log files into {0}: {1}'.format(self._logFileName, e))
397
398 msg.info("Check for expert-monitoring.root file")
399 # the BS-BS step generates the files:
400 # - expert-monitoring.root (from mother process)
401 # - athenaHLT_workers/*/expert-monitoring.root (from child processes)
402 # to save on panda it needs to be renamed via the outputHIST_HLTMONFile argument
403 expectedFileName = 'expert-monitoring.root'
404
405 # first check if trigger step actually completed
406 if self._rc != 0:
407 msg.info('HLT step failed (with status %s) so skip HIST_HLTMON filename check', self._rc)
408 # next check argument is in dictionary as a requested output
409 elif 'outputHIST_HLTMONFile' in self.conf.argdict:
410
411 # rename the mother file
412 expectedMotherFileName = 'expert-monitoring-mother.root'
413 if(os.path.isfile(expectedFileName)):
414 msg.info('Renaming %s to %s', expectedFileName, expectedMotherFileName)
415 try:
416 os.rename(expectedFileName, expectedMotherFileName)
417 except OSError as e:
418 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
419 'Exception raised when renaming {0} to {1}: {2}'.format(expectedFileName, expectedMotherFileName, e))
420 else:
421 msg.error('HLTMON argument defined but mother %s not created', expectedFileName)
422
423 # merge worker files
424 expectedWorkerFileName = 'athenaHLT_workers/athenaHLT-01/' + expectedFileName
425 if(os.path.isfile(expectedWorkerFileName) and os.path.isfile(expectedMotherFileName)):
426 msg.info('Merging worker and mother %s files to %s', expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0])
427 try:
428 # have checked that at least one worker file exists
429 cmd = 'hadd ' + self.conf.argdict['outputHIST_HLTMONFile'].value[0] + ' athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
430 subprocess.call(cmd, shell=True)
431 except OSError as e:
432 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
433 'Exception raised when merging worker and mother {0} files to {1}: {2}'.format(expectedFileName, self.conf.argdict['outputHIST_HLTMONFile'].value[0], e))
434 else:
435 msg.error('HLTMON argument defined %s but worker %s not created', self.conf.argdict['outputHIST_HLTMONFile'].value[0], expectedFileName)
436
437 else:
438 msg.info('HLTMON argument not defined so skip %s check', expectedFileName)
439
440 msg.info("Search for created BS files, and rename if single file found")
441 # The following is needed to handle the BS file being written with a different name (or names)
442 # base is from either the tmp value created by the transform or the value entered by the user
443
444 argInDict = {}
445 if self._rc != 0:
446 msg.error('HLT step failed (with status %s) so skip BS filename check', self._rc)
447 elif 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
448 # list of filenames of files matching expectedOutputFileName
449 matchedOutputFileNames = self._findOutputFiles(self.expectedOutputFileName)
450
451
452 # check there are file matches and rename appropriately
453 if len(matchedOutputFileNames) == 0:
454 msg.error('No BS files created with expected name: %s - please check for earlier failures', self.expectedOutputFileName)
455 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
456 f'No BS files created with expected name: {self.expectedOutputFileName} - please check for earlier failures')
457 else:
458 # if only one BS file was created
459 if len(matchedOutputFileNames) == 1:
460 msg.info('Single BS file found: will split (if requested) and rename the file')
461 BSFile = matchedOutputFileNames[0]
462
463 # if more than one file BS was created, then merge them
464 else:
465 msg.info('Multiple BS files found. A single BS file is required by the next transform steps, so they will be merged. Will split the merged file (if requested) and rename the file')
466 mergedBSFile = matchedOutputFileNames[0].split('._0001.data')[0] + '.mrg._0001.data' # a specific format is required to run file_merging
467
468 mergeFailed = self._mergeBSfiles(matchedOutputFileNames, mergedBSFile)
469 if(mergeFailed):
470 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
471 'Did not produce a merged BS file with file_merging')
472
473 BSFile = 'tmp.BS.mrg'
474 msg.info(f'Renaming temporary merged BS file to {BSFile}')
475 self._renamefile(mergedBSFile, BSFile)
476
477 # First check if we want to produce the COST DRAW output
478 if 'DRAW_TRIGCOST' in self.conf.dataDictionary:
479 splitFailed = self._splitBSfile(['CostMonitoring'], BSFile, self.conf.dataDictionary['DRAW_TRIGCOST'].value[0])
480 if(splitFailed):
481 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
482 'Did not produce any BS file when selecting CostMonitoring stream with trigbs_extractStream.py in file')
483
484 # Run debug step for all streams
485 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
486 self._postExecuteDebug(BSFile)
487
488 # Rename BS file if requested
489 if 'BS' in self.conf.dataDictionary:
490 argInDict = self.conf.dataDictionary['BS']
491 # If a stream (not All) is selected, then slim the orignal (many stream) BS output to the particular stream
492 if 'streamSelection' in self.conf.argdict and self.conf.argdict['streamSelection'].value[0] != "All":
493 splitEmpty = self._splitBSfile(self.conf.argdict['streamSelection'].value, BSFile, argInDict.value[0])
494 if(splitEmpty):
495 msg.info('Did not produce any BS file when selecting stream with trigbs_extractStream.py in file')
496 #If splitEmpty==1, the chosen streams contained no events
497 #then run the command to produce an empty BS file and rename it to RAW.pool.root
498 #this stops non-zero exit code for rejected events
499 cmd_splitFailed = 'trigbs_failedStreamSelection.py ' + BSFile
500 msg.info('running command for creating empty file: %s', cmd_splitFailed)
501 subprocess.call(cmd_splitFailed, shell=True)
502 #Rename the empty file to "RAW.pool.root" to prevent failure
503 #expected filename will be of form: T0debug.runnumber.unknown_debug.unknown.RAW._lb0000._TRF._0001.data
504 runnumber = eformat.EventStorage.pickDataReader(BSFile).runNumber()
505 expectedOutputFileName = 'T0debug.00'+str(runnumber)+'.unknown_debug.unknown.RAW._lb0000._TRF._0001.data'
506 #rename the file to RAW.pool.root, this file will contain 0 events
507 self._renamefile(expectedOutputFileName, argInDict.value[0])
508 else:
509 msg.info('Stream "All" requested, so not splitting BS file')
510 self._renamefile(BSFile, argInDict.value[0])
511 else:
512 msg.info('BS output filetype not defined so skip renaming BS')
513 else:
514 msg.info('BS, DRAW_TRIGCOST or HIST_DEBUGSTREAMMON output filetypes not defined so skip BS post processing')
515
516 msg.info('Now run athenaExecutor:postExecute')
517 super(trigRecoExecutor, self).postExecute()
518
519 if "HIST_DEBUGSTREAMMON" in self.conf.dataDictionary:
520 # Do debug stream postRun step for BS file that contains events after the streamSelection
521 fileNameDbg = self.conf.argdict["outputHIST_DEBUGSTREAMMONFile"].value
522 dbgStream.dbgPostRun(argInDict.value[0], fileNameDbg[0], self.conf.argdict, isSplitStream=True)
523
524
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:179

◆ preExecute()

python.trigRecoExe.trigRecoExecutor.preExecute ( self,
input = set(),
output = set() )

Definition at line 39 of file trigRecoExe.py.

39 def preExecute(self, input = set(), output = set()):
40 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
41
42 # setsid needed to fix process-group id of child processes to be the same as mother process (ATR-20513)
43 self._exe = 'setsid ' + self.conf.argdict['trigExe'].value
44
45 # Check we actually have events to process!
46 if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
47 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
48 msg.debug('Will test for events to process')
49 for dataType in input:
50 inputEvents = self.conf.dataDictionary[dataType].nentries
51 msg.debug('Got {} events for {}'.format(inputEvents, dataType))
52 if not isinstance(inputEvents, int):
53 msg.warning('Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
54 elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
55 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
56 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
57
58
59 if self._skeleton is not None:
60 inputFiles = dict()
61 for dataType in input:
62 inputFiles[dataType] = self.conf.dataDictionary[dataType]
63 outputFiles = dict()
64 for dataType in output:
65 outputFiles[dataType] = self.conf.dataDictionary[dataType]
66
67 # See if we have any 'extra' file arguments
68 for dataType, dataArg in self.conf.dataDictionary.items():
69 if dataArg.io == 'input' and self._name in dataArg.executor:
70 inputFiles[dataArg.subtype] = dataArg
71
72 msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
73
74 # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
75 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
76 output = outputFiles)
77
78
80 if len(input) > 0:
81 self._extraMetadata['inputs'] = list(input)
82 if len(output) > 0:
83 self._extraMetadata['outputs'] = list(output)
84
85
86 asetupString = None
87 if 'asetup' in self.conf.argdict:
88 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
89 else:
90 msg.info('Asetup report: {0}'.format(asetupReport()))
91
92 # If legacy release, bring up centos7 container
93 OSSetupString = None
94 if asetupString is not None:
95 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
96 currentOS = os.environ['ALRB_USER_PLATFORM']
97 if legacyOSRelease and "centos7" not in currentOS:
98 OSSetupString = "centos7"
99 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
100
101 # allow overriding the container OS using a flag
102 if 'runInContainer' in self.conf.argdict:
103 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
104 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
105
106
107 dbrelease = dbsetup = None
108 if 'DBRelease' in self.conf.argdict:
109 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
110 if dbrelease:
111 # Classic tarball - filename format is DBRelease-X.Y.Z.tar.gz
112 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
113 if dbdMatch:
114 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
115 if not os.access(dbrelease, os.R_OK):
116 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
117 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
118 dbrelease = dbdMatch.group(1)
119 dbsetup = cvmfsDBReleaseCheck(dbrelease)
120 else:
121 # Check if the DBRelease is setup
122 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
123 if unpacked:
124 # Now run the setup.py script to customise the paths to the current location...
125 setupDBRelease(dbsetup)
126 # For cvmfs we want just the X.Y.Z release string (and also support 'current')
127 else:
128 dbsetup = cvmfsDBReleaseCheck(dbrelease)
129
130 # Look for environment updates and perpare the athena command line
131 self._envUpdate = trfEnv.environmentUpdate()
132 # above is needed by _prepAthenaCommandLine, but remove the setStandardEnvironment so doesn't include imf or tcmalloc
133 # self._envUpdate.setStandardEnvironment(self.conf.argdict)
134 self._prepAthenaCommandLine()
135
136 # translate relevant parts from the runargs file
137 if 'athenaHLT' in self._exe or 'athenaEF' in self._exe:
138 self._cmd.remove('runargs.BSRDOtoRAW.py')
139 # get list of translated arguments
140 optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
141 self._cmd.extend(optionList)
142 # updates for CA
143 if self._isCAEnabled():
144 msg.info("Running in CA mode")
145 # we don't use the runargs file so add the JO and preExecs to the command line
146 self._cmd.append(self._skeletonCA)
147 if 'preExec' in self.conf.argdict:
148 self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
149 msg.info('Command adjusted for CA to %s', self._cmd)
150 else:
151 msg.info("Running in legacy mode")
152
153 # Run preRun step debug stream analysis if output histogram are set
154 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
155 # Do debug stream preRun step and get asetup string from debug stream input files
156 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
157 # Setup asetup from debug stream
158 # if no --asetup r2b:string was given and is not running with tzero/software/patches as TestArea
159 if asetupString is None and dbgAsetupString is not None:
160 asetupString = dbgAsetupString
161 msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
162 # If legacy release, bring up centos7 container
163 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
164 currentOS = os.environ['ALRB_USER_PLATFORM']
165 if legacyOSRelease and "centos7" not in currentOS:
166 OSSetupString = "centos7"
167 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
168 # allow overriding the container OS using a flag
169 if 'runInContainer' in self.conf.argdict:
170 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
171 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
172
173 # Set database in command line if it was missing
174 if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
175 msg.warn("Database alias will be set to %s", dbAlias)
176 self._cmd.append("--db-server " + dbAlias)
177 else:
178 msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
179
180 # The following is needed to avoid conflicts in finding BS files prduced by running the HLT step
181 # and those already existing in the working directory
182 if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
183 # list of filenames of files matching expectedOutputFileName
184 matchedOutputFileNames = self._findOutputFiles(self.expectedOutputFileName)
185 # check there are no file matches
186 if len(matchedOutputFileNames) > 0:
187 msg.error(f'Directoy already contains files with expected output name format ({self.expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
188 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
189 f'Directory already contains files with expected output name format {self.expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
190
191 # Sanity check:
192 if OSSetupString is not None and asetupString is None:
193 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
194 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
195
196 # Call athenaExecutor parent as the above overrides what athenaExecutor would have done
197 super(athenaExecutor, self).preExecute(input, output)
198
199 # Now we always write a wrapper, because it's very convenient for re-running individual substeps
200 # This will have asetup and/or DB release setups in it
201 # Do this last in this preExecute as the _cmd needs to be finalised
202 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
203 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
204 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
205
STL class.

Member Data Documentation

◆ _cmd

python.trigRecoExe.trigRecoExecutor._cmd
protected

Definition at line 149 of file trigRecoExe.py.

◆ _envUpdate

python.trigRecoExe.trigRecoExecutor._envUpdate = trfEnv.environmentUpdate()
protected

Definition at line 131 of file trigRecoExe.py.

◆ _exe

str python.trigRecoExe.trigRecoExecutor._exe = 'setsid ' + self.conf.argdict['trigExe'].value
protected

Definition at line 43 of file trigRecoExe.py.

◆ _logFileName

python.trigRecoExe.trigRecoExecutor._logFileName
protected

Definition at line 367 of file trigRecoExe.py.

◆ _name

python.trigRecoExe.trigRecoExecutor._name
protected

Definition at line 202 of file trigRecoExe.py.

◆ _rc

python.trigRecoExe.trigRecoExecutor._rc = signal
protected

Definition at line 360 of file trigRecoExe.py.

◆ _skeleton

python.trigRecoExe.trigRecoExecutor._skeleton
protected

Definition at line 229 of file trigRecoExe.py.

◆ _skeletonCA

python.trigRecoExe.trigRecoExecutor._skeletonCA
protected

Definition at line 146 of file trigRecoExe.py.

◆ _substep

python.trigRecoExe.trigRecoExecutor._substep
protected

Definition at line 99 of file trigRecoExe.py.

◆ _topOptionsFiles

python.trigRecoExe.trigRecoExecutor._topOptionsFiles
protected
Initial value:
= self._jobOptionsTemplate.getTopOptions(input = inputFiles,
output = outputFiles)

Definition at line 75 of file trigRecoExe.py.

◆ expectedOutputFileName

python.trigRecoExe.trigRecoExecutor.expectedOutputFileName = '*SingleStream.daq.RAW._*.data'
static

Definition at line 33 of file trigRecoExe.py.

◆ name

python.trigRecoExe.trigRecoExecutor.name

Definition at line 40 of file trigRecoExe.py.


The documentation for this class was generated from the following file: