ATLAS Offline Software
Loading...
Searching...
No Matches
ExecStep.py
Go to the documentation of this file.
2# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3#
4
5'''
6Definitions of exec steps (main job) in Trigger ART tests
7'''
8
9import os
10import re
11
12from TrigValTools.TrigValSteering.Step import Step
13from TrigValTools.TrigValSteering.Input import is_input_defined, get_input
14
15
16class ExecStep(Step):
17 '''
18 Step executing the main job of a Trigger ART test. This can be either
19 athena or athenaHLT or a transform. There can be several ExecSteps in
20 one Test
21 '''
22
23 def __init__(self, name=None):
24 super(ExecStep, self).__init__(name)
25 self.input = None
26 self.input_object = None
27 self.job_options = None
28 self.flags = []
29 self.threads = None
31 self.forks = None
32 self.max_events = None
33 self.skip_events = None
34 self.use_pickle = False
35 self.imf = True
36 self.perfmon = True
37 self.fpe_auditor = True
38 self.costmon = False
39 self.malloc = False
40 self.prmon = True
41 self.config_only = False
43 self.required = True
45
46 def construct_name(self):
47 if self.name and self.type == 'other':
48 return self.name
49 name = self.type if self.type else self.executable
50 if self.name:
51 name += '.'+self.name
52 return name
53
55 if self.log_file_name is not None:
56 return self.log_file_name
57 else:
58 return self.construct_name()+'.log'
59
60 def configure(self, test):
61 # Construct the name
62 self.name = self.construct_name()
63 self.log_file_name = self.name+'.log'
64 self.log.debug('Configuring step %s', self.name)
65
66 # Parse configuration
67 self.configure_type()
68 self.configure_input()
70 self.configure_args(test)
71
72 super(ExecStep, self).configure(test)
73
74 def configure_type(self):
75 self.log.debug('Configuring type for step %s', self.name)
76 # Check if type or executable is specified
77 if self.type is None and self.executable is None:
78 self.misconfig_abort('Cannot configure a step without specified type or executable')
79
80 # Configure executable from type
81 known_types = ['athena', 'athenaHLT', 'Reco_tf', 'Trig_reco_tf', 'Derivation_tf']
82 if self.type in known_types:
83 if self.executable is not None:
84 self.log.warning('type=%s was specified, so executable=%s '
85 'will be ignored and deduced automatically '
86 'from type', self.type, self.executable)
87 self.executable = '{}.py'.format(self.type)
88 elif self.type == 'other' or self.type is None:
89 self.type = 'other'
90 else:
91 self.misconfig_abort('Cannot determine type of this step')
92
93 if self.executable.endswith('_tf.py'):
94 def del_env(varname):
95 if varname in os.environ:
96 del os.environ[varname]
97
98 # Ensure no log duplication for transforms
99 os.environ['TRF_NOECHO'] = '1'
100 del_env('TRF_ECHO')
101
102 # We don't use the Reco_tf auto-configuration for MP/MT transforms,
103 # instead we pass our parameters of choice to athenaopts
104 del_env('ATHENA_NPROC_NUM')
105 del_env('ATHENA_CORE_NUMBER')
106
108 self.log.debug('Configuring input for step %s', self.name)
109 if self.input is None:
110 self.misconfig_abort(
111 'Input not provided for this step. To configure'
112 'a step without input, use an empty string')
113
114 # Step with no input
115 if len(self.input) == 0:
116 return
117
118 # Try to interpret input as keyword
119 if is_input_defined(self.input):
120 self.input_object = get_input(self.input)
121 if self.input_object is None:
122 self.misconfig_abort('Failed to load input with keyword %s', self.input)
123 return
124
125 # Try to interpret explicit paths
126 input_paths = self.input.split(',')
127 for path in input_paths:
128 if not os.path.isfile(path):
129 self.misconfig_abort('The provided input does not exist: %s', self.input)
130 self.log.debug('Using explicit input: %s', self.input)
131
133 '''Check job options configuration'''
134 self.log.debug('Configuring job options for step %s', self.name)
135 if self.type == 'other':
136 self.log.debug('Skipping job options check for step.type=other')
137 return
138 if self.use_pickle:
139 self.log.debug('Skipping job options check for step running from a pickle file')
140 return
141
142 if self.type.endswith('_tf'):
143 if self.job_options is None:
144 return
145 else:
146 self.misconfig_abort('Transform %s does not accept job options', self.type)
147 elif self.job_options is None or len(self.job_options) == 0:
148 self.misconfig_abort('Job options not provided for this step')
149
150 def add_trf_precommand(self, precommand):
151 '''Add preExec to transform command'''
152
153 if self.type in ['Reco_tf', 'Trig_reco_tf', 'Derivation_tf']:
154 if 'inputBS_RDOFile' in self.args:
155 precommand = 'BSRDOtoRAW:' + precommand
156 elif 'outputRDO_TRIGFile' in self.args or 'doRDO_TRIG' in self.args:
157 precommand = 'RDOtoRDOTrigger:' + precommand
158 else:
159 self.log.debug('Skip adding precommand %s to step %s because it is a transform which '
160 'does not run Trigger', precommand, self.name)
161 return
162
163 # match --preExec, --preExec= ignoring spaces
164 m = re.search(r'--preExec\s*=?\s*', self.args)
165 if m is None:
166 self.args += f' --preExec "{precommand}" '
167 else:
168 # Insert new preExec. It is important to not use the '=' sign so we
169 # can chain multiple preExecs.
170 self.args = self.args[:m.span()[0]] + f' --preExec "{precommand}" ' + self.args[m.span()[1]:]
171
172 def configure_args(self, test):
173 self.log.debug('Configuring args for step %s', self.name)
174 if self.args is None:
175 self.args = ''
176 athenaopts = ''
177
178 # Disable prmon for Reco_tf because it is already started inside the transform
179 if self.type == 'Reco_tf' or self.type == 'Derivation_tf':
180 self.prmon = False
181
182 # Disable perfmon for multi-fork jobs as it cannot deal well with them
183 if self.forks and self.forks > 1 and self.perfmon:
184 self.log.debug('Disabling perfmon because forks=%d > 1', self.forks)
185 self.perfmon = False
186 # Disable perfmon for transforms (Reco_tf enables it itself, Trig_reco_tf would need special handling
187 # depending on whether it runs athena or athenaHLT)
188 if self.type.endswith('_tf') and self.perfmon:
189 self.log.debug('Disabling perfmon for the transform step type %s', self.type)
190 self.perfmon = False
191
192 # Append imf/perfmon
193 if self.type != 'other':
194 if self.imf:
195 athenaopts += ' --imf'
196 if self.perfmon:
197 if self.type == 'athenaHLT':
198 athenaopts += ' --perfmon'
199 elif self.type == 'athena':
200 athenaopts += ' --perfmon=fastmonmt'
201 if self.malloc:
202 athenaopts += " --stdcmalloc "
203
204 # Enable CostMonitoring/FPEAuditor
205 if self.type != 'other':
206 if self.costmon:
207 self.flags.append('Trigger.CostMonitoring.monitorAllEvents=True')
208 if self.fpe_auditor:
209 self.flags.append('Exec.FPE=1')
210
211 # Run config-only if requested
212 if self.config_only :
213
214 if self.type == 'athenaHLT' or (self.type == "other" and self.executable == "athenaHLT.py") :
215 athenaopts += ' --dump-config-exit'
216
217 elif self.type == 'athena' or self.type == 'Reco_tf' or self.type == 'Derivation_tf' or (self.type == "other" and self.executable == "athena.py") :
218 athenaopts += ' --config-only=' + self.name + '.pkl'
219
220 # No current support if it isn't clear exactly what's being run
221 # This includes Trig_reco_tf and 'other' where the executable is not known
222 else :
223 self.misconfig_abort('Cannot determine what config-only option is needed. Consider adding the appropriate flag to "args" instead.')
224
225 # Default threads/concurrent_events/forks
226 if test.package_name == 'TrigP1Test' and self.type == 'athenaHLT':
227 if self.threads is None:
228 self.threads = 1
229 if self.concurrent_events is None:
230 self.concurrent_events = 1
231 if self.forks is None:
232 self.forks = 1
233
234 # Append threads/concurrent_events/forks
235 if self.threads is not None:
236 athenaopts += ' --threads={}'.format(self.threads)
237 if self.concurrent_events is not None:
238 athenaopts += ' --concurrent-events={}'.format(
240 if self.forks is not None:
241 athenaopts += ' --nprocs={}'.format(self.forks)
242
243 # Append athenaopts
244 athenaopts = athenaopts.strip()
245 if self.type.endswith('_tf'):
246 self.args += ' --athenaopts="{}"'.format(athenaopts)
247 else:
248 self.args += ' '+athenaopts
249
250 # Default max events
251 if self.max_events is None:
252 if test.art_type == 'build':
253 if test.package_name == 'TrigP1Test':
254 self.max_events = 80
255 else:
256 self.max_events = 20
257 else:
258 self.max_events = 1000
259
260 # Set prmon interval based on max events
261 if self.prmon:
262 if self.max_events <= 100:
264 else:
265 self.prmon_interval = 10
266
267 # Append max/skip events
268 if self.type == 'athena':
269 self.args += ' --evtMax={}'.format(self.max_events)
270 elif self.type == 'athenaHLT':
271 self.args += ' --number-of-events={}'.format(self.max_events)
272 elif self.type.endswith('_tf'):
273 self.args += ' --maxEvents={}'.format(self.max_events)
274 if self.skip_events is not None:
275 if self.type == 'athena':
276 self.args += ' --skipEvents={}'.format(self.skip_events)
277 elif self.type == 'athenaHLT':
278 self.args += ' --skip-events={}'.format(self.skip_events)
279 elif self.type.endswith('_tf'):
280 self.args += ' --skipEvents={}'.format(self.skip_events)
281
282 # Append input
283 if len(self.input) > 0:
284 if self.input_object is not None:
285 if self.type == 'athenaHLT':
286 input_str = ' --file='.join(self.input_object.paths)
287 else:
288 input_str = ','.join(self.input_object.paths)
289 else:
290 input_str = self.input
291 if self.type == 'athena':
292 self.args += ' --filesInput={}'.format(input_str)
293 elif self.type == 'athenaHLT':
294 self.args += ''.join([f" --file={inputFile}" for inputFile in input_str.split(',')])
295 elif self.type.endswith('_tf'):
296 if self.input_object is None:
297 self.misconfig_abort(
298 'Cannot build inputXYZFile string for transform '
299 ' from explicit input path. Use input=\'\' and '
300 'specify the input explicitly in args')
301 if self.type == 'Trig_reco_tf' and '--prodSysBSRDO True' in self.args:
302 self.args += ' --inputBS_RDOFile={}'.format(input_str)
303 else:
304 self.args += ' --input{}File={}'.format(
305 self.input_object.format, input_str)
306
307
308 # Append job options
309 if self.job_options is not None:
310 self.args += ' '+self.job_options
311
312 # Append flags
313 if self.flags:
314 if not isinstance(self.flags, (list, tuple)):
315 self.misconfig_abort('Wrong type for flags. Expected list or tuple.')
316
317 if self.type.endswith('_tf'): # for transform, set flags as pre-exec
318 if self.type == 'Trig_reco_tf':
319 # No 'flags.' prefix for the trigger transform
320 self.add_trf_precommand(' '.join(f'{flag}' for flag in self.flags))
321 else:
322 self.add_trf_precommand(';'.join(f'flags.{flag}' for flag in self.flags))
323 else: # athena(HLT)
324 self.args += ' ' + ' '.join(self.flags)
325
326 # Strip extra whitespace
327 self.args = self.args.strip()
const bool debug
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177