ATLAS Offline Software
Loading...
Searching...
No Matches
ExecStep.py
Go to the documentation of this file.
2# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3#
4
5'''
6Definitions of exec steps (main job) in Trigger ART tests
7'''
8
9import os
10import re
11
12from TrigValTools.TrigValSteering.Step import Step
13from TrigValTools.TrigValSteering.Input import is_input_defined, get_input
14
15
16class ExecStep(Step):
17 '''
18 Step executing the main job of a Trigger ART test. This can be either
19 athena or athenaHLT or a transform. There can be several ExecSteps in
20 one Test
21 '''
22
23 def __init__(self, name=None):
24 super(ExecStep, self).__init__(name)
25 self.input = None
26 self.input_object = None
27 self.job_options = None
28 self.flags = []
29 self.threads = None
31 self.forks = None
32 self.max_events = None
33 self.skip_events = None
34 self.use_pickle = False
35 self.imf = True
36 self.perfmon = True
37 self.fpe_auditor = True
38 self.costmon = False
39 self.malloc = False
40 self.prmon = True
41 self.config_only = False
43 self.required = True
45
46 def construct_name(self):
47 if self.name and self.type == 'other':
48 return self.name
49 name = self.type if self.type else self.executable
50 if self.name:
51 name += '.'+self.name
52 return name
53
55 if self.log_file_name is not None:
56 return self.log_file_name
57 else:
58 return self.construct_name()+'.log'
59
60 def configure(self, test):
61 # Construct the name
62 self.name = self.construct_name()
63 self.log_file_name = self.name+'.log'
64 self.log.debug('Configuring step %s', self.name)
65
66 # Parse configuration
67 self.configure_type()
68 self.configure_input()
70 self.configure_args(test)
71
72 super(ExecStep, self).configure(test)
73
74 def configure_type(self):
75 self.log.debug('Configuring type for step %s', self.name)
76 # Check if type or executable is specified
77 if self.type is None and self.executable is None:
78 self.misconfig_abort('Cannot configure a step without specified type or executable')
79
80 # Configure executable from type
81 known_types = ['athena', 'athenaHLT', 'athenaEF', 'Reco_tf', 'Trig_reco_tf', 'Derivation_tf']
82 if self.type in known_types:
83 if self.executable is not None:
84 self.log.warning('type=%s was specified, so executable=%s '
85 'will be ignored and deduced automatically '
86 'from type', self.type, self.executable)
87 self.executable = '{}.py'.format(self.type)
88 elif self.type == 'other' or self.type is None:
89 self.type = 'other'
90 else:
91 self.misconfig_abort('Cannot determine type of this step')
92
93 if self.executable.endswith('_tf.py'):
94 def del_env(varname):
95 if varname in os.environ:
96 del os.environ[varname]
97
98 # Ensure no log duplication for transforms
99 os.environ['TRF_NOECHO'] = '1'
100 del_env('TRF_ECHO')
101
102 # We don't use the Reco_tf auto-configuration for MP/MT transforms,
103 # instead we pass our parameters of choice to athenaopts
104 del_env('ATHENA_NPROC_NUM')
105 del_env('ATHENA_CORE_NUMBER')
106
108 self.log.debug('Configuring input for step %s', self.name)
109 if self.input is None:
110 self.misconfig_abort(
111 'Input not provided for this step. To configure'
112 'a step without input, use an empty string')
113
114 # Step with no input
115 if len(self.input) == 0:
116 return
117
118 # Try to interpret input as keyword
119 if is_input_defined(self.input):
120 self.input_object = get_input(self.input)
121 if self.input_object is None:
122 self.misconfig_abort('Failed to load input with keyword %s', self.input)
123 return
124
125 # Try to interpret explicit paths
126 input_paths = self.input.split(',')
127 for path in input_paths:
128 if not os.path.isfile(path):
129 self.misconfig_abort('The provided input does not exist: %s', self.input)
130 self.log.debug('Using explicit input: %s', self.input)
131
133 '''Check job options configuration'''
134 self.log.debug('Configuring job options for step %s', self.name)
135 if self.type == 'other':
136 self.log.debug('Skipping job options check for step.type=other')
137 return
138 if self.use_pickle:
139 self.log.debug('Skipping job options check for step running from a pickle file')
140 return
141
142 if self.type.endswith('_tf'):
143 if self.job_options is None:
144 return
145 else:
146 self.misconfig_abort('Transform %s does not accept job options', self.type)
147 elif self.job_options is None or len(self.job_options) == 0:
148 self.misconfig_abort('Job options not provided for this step')
149
150 def add_trf_precommand(self, precommand):
151 '''Add preExec to transform command'''
152
153 if self.type in ['Reco_tf', 'Trig_reco_tf', 'Derivation_tf']:
154 if 'inputBS_RDOFile' in self.args:
155 precommand = 'BSRDOtoRAW:' + precommand
156 elif 'outputRDO_TRIGFile' in self.args or 'doRDO_TRIG' in self.args:
157 precommand = 'RDOtoRDOTrigger:' + precommand
158 else:
159 self.log.debug('Skip adding precommand %s to step %s because it is a transform which '
160 'does not run Trigger', precommand, self.name)
161 return
162
163 # match --preExec, --preExec= ignoring spaces
164 m = re.search(r'--preExec\s*=?\s*', self.args)
165 if m is None:
166 self.args += f' --preExec "{precommand}" '
167 else:
168 # Insert new preExec. It is important to not use the '=' sign so we
169 # can chain multiple preExecs.
170 self.args = self.args[:m.span()[0]] + f' --preExec "{precommand}" ' + self.args[m.span()[1]:]
171
172 def configure_args(self, test):
173 self.log.debug('Configuring args for step %s', self.name)
174 if self.args is None:
175 self.args = ''
176 athenaopts = ''
177
178 # Disable prmon for Reco_tf because it is already started inside the transform
179 if self.type == 'Reco_tf' or self.type == 'Derivation_tf':
180 self.prmon = False
181
182 # Disable perfmon for multi-fork jobs as it cannot deal well with them
183 if self.forks and self.forks > 1 and self.perfmon:
184 self.log.debug('Disabling perfmon because forks=%d > 1', self.forks)
185 self.perfmon = False
186 # Disable perfmon for transforms (Reco_tf enables it itself, Trig_reco_tf would need special handling
187 # depending on whether it runs athena or athenaHLT)
188 if self.type.endswith('_tf') and self.perfmon:
189 self.log.debug('Disabling perfmon for the transform step type %s', self.type)
190 self.perfmon = False
191
192 # Append imf/perfmon
193 if self.type != 'other':
194 if self.imf:
195 athenaopts += ' --imf'
196 if self.perfmon:
197 if self.type == 'athenaHLT' or self.type == 'athenaEF':
198 athenaopts += ' --perfmon'
199 elif self.type == 'athena':
200 athenaopts += ' --perfmon=fastmonmt'
201 if self.malloc:
202 athenaopts += " --stdcmalloc "
203
204 # Enable CostMonitoring/FPEAuditor
205 if self.type != 'other':
206 if self.costmon:
207 self.flags.append('Trigger.CostMonitoring.monitorAllEvents=True')
208 if self.fpe_auditor:
209 self.flags.append('Exec.FPE=1')
210
211 # Run config-only if requested
212 if self.config_only :
213
214 if self.type == 'athenaHLT' or self.type == 'athenaEF' or (self.type == "other" and self.executable == "athenaHLT.py") or (self.type == "other" and self.executable == "athenaEF.py") :
215 athenaopts += ' --dump-config-exit'
216
217 elif self.type == 'athena' or self.type == 'Reco_tf' or self.type == 'Derivation_tf' or (self.type == "other" and self.executable == "athena.py") :
218 athenaopts += ' --config-only=' + self.name + '.pkl'
219
220 # No current support if it isn't clear exactly what's being run
221 # This includes Trig_reco_tf and 'other' where the executable is not known
222 else :
223 self.misconfig_abort('Cannot determine what config-only option is needed. Consider adding the appropriate flag to "args" instead.')
224
225 # Default threads/concurrent_events/forks
226 if test.package_name == 'TrigP1Test' and self.type == 'athenaHLT':
227 if self.threads is None:
228 self.threads = 1
229 if self.concurrent_events is None:
230 self.concurrent_events = 1
231 if self.forks is None:
232 self.forks = 1
233 if test.package_name == 'TrigP1Test' and self.type == 'athenaEF':
234 if self.threads is None:
235 self.threads = 1
236 if self.concurrent_events is None:
237 self.concurrent_events = 1
238
239 # Append threads/concurrent_events/forks
240 if self.threads is not None:
241 athenaopts += ' --threads={}'.format(self.threads)
242 if self.concurrent_events is not None:
243 athenaopts += ' --concurrent-events={}'.format(
245 if self.forks is not None and self.type != 'athenaEF':
246 athenaopts += ' --nprocs={}'.format(self.forks)
247
248 # Append athenaopts
249 athenaopts = athenaopts.strip()
250 if self.type.endswith('_tf'):
251 self.args += ' --athenaopts="{}"'.format(athenaopts)
252 else:
253 self.args += ' '+athenaopts
254
255 # Default max events
256 if self.max_events is None:
257 if test.art_type == 'build':
258 if test.package_name == 'TrigP1Test':
259 self.max_events = 80
260 else:
261 self.max_events = 20
262 else:
263 self.max_events = 1000
264
265 # Set prmon interval based on max events
266 if self.prmon:
267 if self.max_events <= 100:
269 else:
270 self.prmon_interval = 10
271
272 # Append max/skip events
273 if self.type == 'athena':
274 self.args += ' --evtMax={}'.format(self.max_events)
275 elif self.type == 'athenaHLT' or self.type == 'athenaEF':
276 self.args += ' --number-of-events={}'.format(self.max_events)
277 elif self.type.endswith('_tf'):
278 self.args += ' --maxEvents={}'.format(self.max_events)
279 if self.skip_events is not None:
280 if self.type == 'athena':
281 self.args += ' --skipEvents={}'.format(self.skip_events)
282 elif self.type == 'athenaHLT' or self.type == 'athenaEF':
283 self.args += ' --skip-events={}'.format(self.skip_events)
284 elif self.type.endswith('_tf'):
285 self.args += ' --skipEvents={}'.format(self.skip_events)
286
287 # Append input
288 if len(self.input) > 0:
289 if self.input_object is not None:
290 if self.type == 'athenaHLT' or self.type == 'athenaEF':
291 input_str = ' --file='.join(self.input_object.paths)
292 else:
293 input_str = ','.join(self.input_object.paths)
294 else:
295 input_str = self.input
296 if self.type == 'athena':
297 self.args += ' --filesInput={}'.format(input_str)
298 elif self.type == 'athenaHLT' or self.type == 'athenaEF':
299 self.args += ''.join([f" --file={inputFile}" for inputFile in input_str.split(',')])
300 elif self.type.endswith('_tf'):
301 if self.input_object is None:
302 self.misconfig_abort(
303 'Cannot build inputXYZFile string for transform '
304 ' from explicit input path. Use input=\'\' and '
305 'specify the input explicitly in args')
306 if self.type == 'Trig_reco_tf' and '--prodSysBSRDO True' in self.args:
307 self.args += ' --inputBS_RDOFile={}'.format(input_str)
308 else:
309 self.args += ' --input{}File={}'.format(
310 self.input_object.format, input_str)
311
312
313 # Append job options
314 if self.job_options is not None:
315 self.args += ' '+self.job_options
316
317 # Append flags
318 if self.flags:
319 if not isinstance(self.flags, (list, tuple)):
320 self.misconfig_abort('Wrong type for flags. Expected list or tuple.')
321
322 if self.type.endswith('_tf'): # for transform, set flags as pre-exec
323 if self.type == 'Trig_reco_tf':
324 # No 'flags.' prefix for the trigger transform
325 self.add_trf_precommand(' '.join(f'{flag}' for flag in self.flags))
326 else:
327 self.add_trf_precommand(';'.join(f'flags.{flag}' for flag in self.flags))
328 else: # athena(HLT)
329 self.args += ' ' + ' '.join(self.flags)
330
331 # Strip extra whitespace
332 self.args = self.args.strip()
const bool debug
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177