ATLAS Offline Software
ExecStep.py
Go to the documentation of this file.
1 #
2 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 #
4 
5 '''
6 Definitions of exec steps (main job) in Trigger ART tests
7 '''
8 
9 import os
10 import re
11 
12 from TrigValTools.TrigValSteering.Step import Step
13 from TrigValTools.TrigValSteering.Input import is_input_defined, get_input
14 
15 
16 class ExecStep(Step):
17  '''
18  Step executing the main job of a Trigger ART test. This can be either
19  athena or athenaHLT or a transform. There can be several ExecSteps in
20  one Test
21  '''
22 
23  def __init__(self, name=None):
24  super(ExecStep, self).__init__(name)
25  self.input = None
26  self.input_object = None
27  self.job_options = None
28  self.flags = []
29  self.threads = None
30  self.concurrent_events = None
31  self.forks = None
32  self.max_events = None
33  self.skip_events = None
34  self.use_pickle = False
35  self.imf = True
36  self.perfmon = True
37  self.fpe_auditor = True
38  self.costmon = False
39  self.malloc = False
40  self.prmon = True
41  self.config_only = False
42  self.auto_report_result = True
43  self.required = True
44  self.depends_on_previous = True
45 
46  def construct_name(self):
47  if self.name and self.type == 'other':
48  return self.name
49  name = self.type if self.type else self.executable
50  if self.name:
51  name += '.'+self.name
52  return name
53 
54  def get_log_file_name(self):
55  if self.log_file_name is not None:
56  return self.log_file_name
57  else:
58  return self.construct_name()+'.log'
59 
60  def configure(self, test):
61  # Construct the name
62  self.name = self.construct_name()
63  self.log_file_name = self.name+'.log'
64  self.log.debug('Configuring step %s', self.name)
65 
66  # Parse configuration
67  self.configure_type()
68  self.configure_input()
70  self.configure_args(test)
71 
72  super(ExecStep, self).configure(test)
73 
74  def configure_type(self):
75  self.log.debug('Configuring type for step %s', self.name)
76  # Check if type or executable is specified
77  if self.type is None and self.executable is None:
78  self.misconfig_abort('Cannot configure a step without specified type or executable')
79 
80  # Configure executable from type
81  known_types = ['athena', 'athenaHLT', 'Reco_tf', 'Trig_reco_tf', 'Derivation_tf']
82  if self.type in known_types:
83  if self.executable is not None:
84  self.log.warning('type=%s was specified, so executable=%s '
85  'will be ignored and deduced automatically '
86  'from type', self.type, self.executable)
87  self.executable = '{}.py'.format(self.type)
88  elif self.type == 'other' or self.type is None:
89  self.type = 'other'
90  else:
91  self.misconfig_abort('Cannot determine type of this step')
92 
93  if self.executable.endswith('_tf.py'):
94  def del_env(varname):
95  if varname in os.environ:
96  del os.environ[varname]
97 
98  # Ensure no log duplication for transforms
99  os.environ['TRF_NOECHO'] = '1'
100  del_env('TRF_ECHO')
101 
102  # We don't use the Reco_tf auto-configuration for MP/MT transforms,
103  # instead we pass our parameters of choice to athenaopts
104  del_env('ATHENA_NPROC_NUM')
105  del_env('ATHENA_CORE_NUMBER')
106 
107  def configure_input(self):
108  self.log.debug('Configuring input for step %s', self.name)
109  if self.input is None:
110  self.misconfig_abort(
111  'Input not provided for this step. To configure'
112  'a step without input, use an empty string')
113 
114  # Step with no input
115  if len(self.input) == 0:
116  return
117 
118  # Try to interpret input as keyword
119  if is_input_defined(self.input):
120  self.input_object = get_input(self.input)
121  if self.input_object is None:
122  self.misconfig_abort('Failed to load input with keyword %s', self.input)
123  return
124 
125  # Try to interpret explicit paths
126  input_paths = self.input.split(',')
127  for path in input_paths:
128  if not os.path.isfile(path):
129  self.misconfig_abort('The provided input does not exist: %s', self.input)
130  self.log.debug('Using explicit input: %s', self.input)
131 
133  '''Check job options configuration'''
134  self.log.debug('Configuring job options for step %s', self.name)
135  if self.type == 'other':
136  self.log.debug('Skipping job options check for step.type=other')
137  return
138  if self.use_pickle:
139  self.log.debug('Skipping job options check for step running from a pickle file')
140  return
141 
142  if self.type.endswith('_tf'):
143  if self.job_options is None:
144  return
145  else:
146  self.misconfig_abort('Transform %s does not accept job options', self.type)
147  elif self.job_options is None or len(self.job_options) == 0:
148  self.misconfig_abort('Job options not provided for this step')
149 
150  def add_trf_precommand(self, precommand):
151  '''Add preExec to transform command'''
152 
153  if self.type in ['Reco_tf', 'Trig_reco_tf', 'Derivation_tf']:
154  if 'inputBS_RDOFile' in self.args:
155  precommand = 'BSRDOtoRAW:' + precommand
156  elif 'outputRDO_TRIGFile' in self.args or 'doRDO_TRIG' in self.args:
157  precommand = 'RDOtoRDOTrigger:' + precommand
158  else:
159  self.log.debug('Skip adding precommand %s to step %s because it is a transform which '
160  'does not run Trigger', precommand, self.name)
161  return
162 
163  # match --preExec, --preExec= ignoring spaces
164  m = re.search(r'--preExec\s*=?\s*', self.args)
165  if m is None:
166  self.args += f' --preExec "{precommand}" '
167  else:
168  # Insert new preExec. It is important to not use the '=' sign so we
169  # can chain multiple preExecs.
170  self.args = self.args[:m.span()[0]] + f' --preExec "{precommand}" ' + self.args[m.span()[1]:]
171 
172  def configure_args(self, test):
173  self.log.debug('Configuring args for step %s', self.name)
174  if self.args is None:
175  self.args = ''
176  athenaopts = ''
177 
178  # Disable prmon for Reco_tf because it is already started inside the transform
179  if self.type == 'Reco_tf' or self.type == 'Derivation_tf':
180  self.prmon = False
181 
182  # Disable perfmon for multi-fork jobs as it cannot deal well with them
183  if self.forks and self.forks > 1 and self.perfmon:
184  self.log.debug('Disabling perfmon because forks=%d > 1', self.forks)
185  self.perfmon = False
186  # Disable perfmon for transforms (Reco_tf enables it itself, Trig_reco_tf would need special handling
187  # depending on whether it runs athena or athenaHLT)
188  if self.type.endswith('_tf') and self.perfmon:
189  self.log.debug('Disabling perfmon for the transform step type %s', self.type)
190  self.perfmon = False
191 
192  # Append imf/perfmon
193  if self.type != 'other':
194  if self.imf:
195  athenaopts += ' --imf'
196  if self.perfmon:
197  if self.type == 'athenaHLT':
198  athenaopts += ' --perfmon'
199  elif self.type == 'athena':
200  athenaopts += ' --perfmon=fastmonmt'
201  if self.malloc:
202  athenaopts += " --stdcmalloc "
203 
204  # Enable CostMonitoring/FPEAuditor
205  if self.type != 'other':
206  if self.costmon:
207  self.flags.append('Trigger.CostMonitoring.monitorAllEvents=True')
208  if self.fpe_auditor:
209  self.flags.append('Exec.FPE=1')
210 
211  # Run config-only if requested
212  if self.config_only :
213 
214  if self.type == 'athenaHLT' or (self.type == "other" and self.executable == "athenaHLT.py") :
215  athenaopts += ' --dump-config-exit'
216 
217  elif self.type == 'athena' or self.type == 'Reco_tf' or self.type == 'Derivation_tf' or (self.type == "other" and self.executable == "athena.py") :
218  athenaopts += ' --config-only=' + self.name + '.pkl'
219 
220  # No current support if it isn't clear exactly what's being run
221  # This includes Trig_reco_tf and 'other' where the executable is not known
222  else :
223  self.misconfig_abort('Cannot determine what config-only option is needed. Consider adding the appropriate flag to "args" instead.')
224 
225  # Default threads/concurrent_events/forks
226  if test.package_name == 'TrigP1Test' and self.type == 'athenaHLT':
227  if self.threads is None:
228  self.threads = 1
229  if self.concurrent_events is None:
230  self.concurrent_events = 1
231  if self.forks is None:
232  self.forks = 1
233 
234  # Append threads/concurrent_events/forks
235  if self.threads is not None:
236  athenaopts += ' --threads={}'.format(self.threads)
237  if self.concurrent_events is not None:
238  athenaopts += ' --concurrent-events={}'.format(
239  self.concurrent_events)
240  if self.forks is not None:
241  athenaopts += ' --nprocs={}'.format(self.forks)
242 
243  # Append athenaopts
244  athenaopts = athenaopts.strip()
245  if self.type.endswith('_tf'):
246  self.args += ' --athenaopts="{}"'.format(athenaopts)
247  else:
248  self.args += ' '+athenaopts
249 
250  # Default max events
251  if self.max_events is None:
252  if test.art_type == 'build':
253  if test.package_name == 'TrigP1Test':
254  self.max_events = 80
255  else:
256  self.max_events = 20
257  else:
258  self.max_events = 1000
259 
260  # Set prmon interval based on max events
261  if self.prmon:
262  if self.max_events <= 100:
263  self.prmon_interval = 5
264  else:
265  self.prmon_interval = 10
266 
267  # Append max/skip events
268  if self.type == 'athena':
269  self.args += ' --evtMax={}'.format(self.max_events)
270  elif self.type == 'athenaHLT':
271  self.args += ' --number-of-events={}'.format(self.max_events)
272  elif self.type.endswith('_tf'):
273  self.args += ' --maxEvents={}'.format(self.max_events)
274  if self.skip_events is not None:
275  if self.type == 'athena':
276  self.args += ' --skipEvents={}'.format(self.skip_events)
277  elif self.type == 'athenaHLT':
278  self.args += ' --skip-events={}'.format(self.skip_events)
279  elif self.type.endswith('_tf'):
280  self.args += ' --skipEvents={}'.format(self.skip_events)
281 
282  # Append input
283  if len(self.input) > 0:
284  if self.input_object is not None:
285  if self.type == 'athenaHLT':
286  input_str = ' --file='.join(self.input_object.paths)
287  else:
288  input_str = ','.join(self.input_object.paths)
289  else:
290  input_str = self.input
291  if self.type == 'athena':
292  self.args += ' --filesInput={}'.format(input_str)
293  elif self.type == 'athenaHLT':
294  self.args += ''.join([f" --file={inputFile}" for inputFile in input_str.split(',')])
295  elif self.type.endswith('_tf'):
296  if self.input_object is None:
297  self.misconfig_abort(
298  'Cannot build inputXYZFile string for transform '
299  ' from explicit input path. Use input=\'\' and '
300  'specify the input explicitly in args')
301  if self.type == 'Trig_reco_tf' and '--prodSysBSRDO True' in self.args:
302  self.args += ' --inputBS_RDOFile={}'.format(input_str)
303  else:
304  self.args += ' --input{}File={}'.format(
305  self.input_object.format, input_str)
306 
307 
308  # Append job options
309  if self.job_options is not None:
310  self.args += ' '+self.job_options
311 
312  # Append flags
313  if self.flags:
314  if not isinstance(self.flags, (list, tuple)):
315  self.misconfig_abort('Wrong type for flags. Expected list or tuple.')
316 
317  if self.type.endswith('_tf'): # for transform, set flags as pre-exec
318  if self.type == 'Trig_reco_tf':
319  # No 'flags.' prefix for the trigger transform
320  self.add_trf_precommand(' '.join(f'{flag}' for flag in self.flags))
321  else:
322  self.add_trf_precommand(';'.join(f'flags.{flag}' for flag in self.flags))
323  else: # athena(HLT)
324  self.args += ' ' + ' '.join(self.flags)
325 
326  # Strip extra whitespace
327  self.args = self.args.strip()
python.TrigValSteering.ExecStep.ExecStep.configure_type
def configure_type(self)
Definition: ExecStep.py:74
python.TrigValSteering.ExecStep.ExecStep.auto_report_result
auto_report_result
Definition: ExecStep.py:42
python.TrigValSteering.ExecStep.ExecStep.construct_name
def construct_name(self)
Definition: ExecStep.py:46
python.TrigValSteering.ExecStep.ExecStep.imf
imf
Definition: ExecStep.py:35
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TrigValSteering.ExecStep.ExecStep.job_options
job_options
Definition: ExecStep.py:27
python.TrigValSteering.ExecStep.ExecStep.get_log_file_name
def get_log_file_name(self)
Definition: ExecStep.py:54
python.TrigValSteering.ExecStep.ExecStep.type
type
Definition: ExecStep.py:47
python.TrigValSteering.ExecStep.ExecStep.required
required
Definition: ExecStep.py:43
python.TrigValSteering.ExecStep.ExecStep.executable
executable
Definition: ExecStep.py:87
python.TrigValSteering.ExecStep.ExecStep.config_only
config_only
Definition: ExecStep.py:41
python.TrigValSteering.Input.get_input
def get_input(keyword)
Definition: Input.py:96
python.TrigValSteering.ExecStep.ExecStep.configure_job_options
def configure_job_options(self)
Definition: ExecStep.py:132
python.TrigValSteering.ExecStep.ExecStep.__init__
def __init__(self, name=None)
Definition: ExecStep.py:23
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.TrigValSteering.ExecStep.ExecStep.skip_events
skip_events
Definition: ExecStep.py:33
python.TrigValSteering.ExecStep.ExecStep.threads
threads
Definition: ExecStep.py:29
python.TrigValSteering.ExecStep.ExecStep.malloc
malloc
Definition: ExecStep.py:39
python.TrigValSteering.ExecStep.ExecStep.input
input
Definition: ExecStep.py:25
python.TrigValSteering.ExecStep.ExecStep.depends_on_previous
depends_on_previous
Definition: ExecStep.py:44
python.TrigValSteering.ExecStep.ExecStep.forks
forks
Definition: ExecStep.py:31
python.TrigValSteering.ExecStep.ExecStep.use_pickle
use_pickle
Definition: ExecStep.py:34
python.TrigValSteering.ExecStep.ExecStep.perfmon
perfmon
Definition: ExecStep.py:36
python.TrigValSteering.ExecStep.ExecStep.configure
def configure(self, test)
Definition: ExecStep.py:60
python.TrigValSteering.ExecStep.ExecStep.flags
flags
Definition: ExecStep.py:28
python.TrigValSteering.Input.is_input_defined
def is_input_defined(keyword)
Definition: Input.py:90
python.TrigValSteering.ExecStep.ExecStep
Definition: ExecStep.py:16
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.TrigValSteering.ExecStep.ExecStep.prmon
prmon
Definition: ExecStep.py:40
python.TrigValSteering.ExecStep.ExecStep.add_trf_precommand
def add_trf_precommand(self, precommand)
Definition: ExecStep.py:150
debug
const bool debug
Definition: MakeUncertaintyPlots.cxx:53
python.TrigValSteering.ExecStep.ExecStep.fpe_auditor
fpe_auditor
Definition: ExecStep.py:37
python.TrigValSteering.ExecStep.ExecStep.name
name
Definition: ExecStep.py:62
python.TrigValSteering.ExecStep.ExecStep.max_events
max_events
Definition: ExecStep.py:32
python.TrigValSteering.ExecStep.ExecStep.args
args
Definition: ExecStep.py:170
python.TrigValSteering.ExecStep.ExecStep.prmon_interval
prmon_interval
Definition: ExecStep.py:263
python.TrigValSteering.ExecStep.ExecStep.log_file_name
log_file_name
Definition: ExecStep.py:63
python.TrigValSteering.ExecStep.ExecStep.configure_input
def configure_input(self)
Definition: ExecStep.py:107
python.TrigValSteering.ExecStep.ExecStep.configure_args
def configure_args(self, test)
Definition: ExecStep.py:172
python.TrigValSteering.ExecStep.ExecStep.costmon
costmon
Definition: ExecStep.py:38
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.TrigValSteering.ExecStep.ExecStep.concurrent_events
concurrent_events
Definition: ExecStep.py:30
python.TrigValSteering.ExecStep.ExecStep.input_object
input_object
Definition: ExecStep.py:26