ATLAS Offline Software
Loading...
Searching...
No Matches
StandardTests.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2from typing import List
3
4from .Checks import AODContentCheck, AODDigestCheck, FrozenTier0PolicyCheck, MetadataCheck
5from .Inputs import input_EVNT, input_HITS, \
6 input_HITS_unfiltered, \
7 input_RDO_BKG, input_RDO_BKG_data, input_BS_minimum_bias_overlay, \
8 input_EVNT_data_overlay, input_HITS_data_overlay, \
9 input_HITS_minbias_low, input_HITS_minbias_high, input_HITS_neutrino, \
10 input_HITS_minbias_low_fulltruth, input_HITS_minbias_high_fulltruth, \
11 input_AOD
12from .Test import TestSetup, WorkflowRun, WorkflowTest, WorkflowType
13
14
16 """General workflow q-test."""
17
18 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
19 if "maxEvents" not in extra_args:
20 if type == WorkflowType.MCPileUpReco or run == WorkflowRun.Run4:
21 extra_args += " --maxEvents 5"
22 else:
23 extra_args += " --maxEvents 20"
24
25 if type == WorkflowType.MCPileUpReco:
26 if "inputHITSFile" not in extra_args:
27 extra_args += f" --inputHITSFile {input_HITS[run]}"
28 if "inputRDO_BKGFile" not in extra_args:
29 extra_args += " --inputRDO_BKGFile ../run_d*/myRDO.pool.root"
30
31 threads = 1
32 threads_argument = '--multithreaded'
33 if setup.custom_threads is not None:
34 threads = setup.custom_threads
35 if threads <= 0:
36 threads_argument = ''
37
38 self.command = \
39 (f"ATHENA_CORE_NUMBER={threads} Reco_tf.py {threads_argument} --AMIConfig {ID}"
40 f" --imf False {extra_args}")
41
42 self.output_checks = []
43 # TODO: disable RDO comparison for now
44 # if type == WorkflowType.MCReco:
45 # self.output_checks.append(FrozenTier0PolicyCheck(setup, "RDO", 10))
46 self.output_checks.append(FrozenTier0PolicyCheck(setup, "AOD", 60))
47 self.output_checks.append(FrozenTier0PolicyCheck(setup, "ESD", 20))
48 self.output_checks.append(MetadataCheck(setup, "AOD"))
49
50 self.digest_checks = []
51 if not setup.disable_output_checks:
52 self.digest_checks.append(AODContentCheck(setup))
53 self.digest_checks.append(AODDigestCheck(setup))
54
55 super().__init__(ID, run, type, steps, setup)
56
57
59 """Simulation workflow test."""
60
61 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
62 if "maxEvents" not in extra_args:
63 extra_args += " --maxEvents 20"
64
65 if "jobNumber" not in extra_args and run is WorkflowRun.Run3 and type is WorkflowType.FullSim:
66 extra_args += " --jobNumber 5"
67
68 input_argument = ""
69 if "inputEVNTFile" not in extra_args and "inputHITSFile" not in extra_args:
70 if type is WorkflowType.HitsFilter:
71 input_argument = f"--inputHITSFile {input_HITS_unfiltered[run]}"
72 elif type is WorkflowType.HitsMerge:
73 input_argument = f"--inputHITSFile {input_HITS[run]}"
74 else:
75 input_argument = f"--inputEVNTFile {input_EVNT[run]}"
76
77 threads = 0
78 threads_argument = '--multithreaded'
79 if setup.custom_threads is not None:
80 threads = setup.custom_threads
81 if threads <= 0:
82 threads_argument = ''
83
84 if type is WorkflowType.HitsMerge:
85 self.command = \
86 (f"ATHENA_CORE_NUMBER={threads} HITSMerge_tf.py {threads_argument} --AMIConfig {ID}"
87 f" {input_argument} --outputHITS_MRGFile myHITS.pool.root"
88 f" --imf False {extra_args}")
89 elif type is WorkflowType.HitsFilter:
90 self.command = \
91 (f"ATHENA_CORE_NUMBER={threads} FilterHit_tf.py {threads_argument} --AMIConfig {ID}"
92 f" {input_argument} --outputHITS_FILTFile myHITS.pool.root"
93 f" --imf False {extra_args}")
94 else:
95 self.command = \
96 (f"ATHENA_CORE_NUMBER={threads} Sim_tf.py {threads_argument} --AMIConfig {ID}"
97 f" {input_argument} --outputHITSFile myHITS.pool.root"
98 f" --imf False {extra_args}")
99
101 FrozenTier0PolicyCheck(setup, "HITS", 10),
102 MetadataCheck(setup, "HITS"),
103 ]
104
105 super().__init__(ID, run, type, steps, setup)
106
107
109 """MC overlay workflow test."""
110
111 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
112 if "maxEvents" not in extra_args:
113 extra_args += " --maxEvents 10"
114
115 self.command = \
116 (f"Overlay_tf.py --AMIConfig {ID}"
117 f" --inputHITSFile {input_HITS[run]} --inputRDO_BKGFile {input_RDO_BKG[run]} --outputRDOFile myRDO.pool.root"
118 f" --imf False {extra_args}")
119
120 # skip performance checks for now
122
124 FrozenTier0PolicyCheck(setup, "RDO", 10),
125 MetadataCheck(setup, "RDO"),
126 ]
127
128 super().__init__(ID, run, type, steps, setup)
129
130
132 """Data overlay workflow test."""
133
134 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
135 if "maxEvents" not in extra_args:
136 extra_args += " --maxEvents 10"
137
138 if type is WorkflowType.DataOverlayChain:
139 self.command = \
140 (f"FastChain_tf.py --AMIConfig {ID}"
141 f" --inputEVNTFile {input_EVNT_data_overlay[run]} --inputRDO_BKGFile {input_RDO_BKG_data[run]} --outputHITSFile myHITS.pool.root --outputRDOFile myRDO.pool.root"
142 f" --imf False {extra_args}")
143 else:
144 self.command = \
145 (f"Overlay_tf.py --AMIConfig {ID}"
146 f" --inputHITSFile {input_HITS_data_overlay[run]} --inputRDO_BKGFile {input_RDO_BKG_data[run]} --outputRDOFile myRDO.pool.root"
147 f" --imf False {extra_args}")
148
150 if type is WorkflowType.DataOverlayChain:
151 self.output_checks.extend([
152 FrozenTier0PolicyCheck(setup, "HITS", 10),
153 MetadataCheck(setup, "HITS"),
154 ])
155
156 self.output_checks.extend([
157 FrozenTier0PolicyCheck(setup, "RDO", 10),
158 MetadataCheck(setup, "RDO"),
159 ])
160
161 super().__init__(ID, run, type, steps, setup)
162
163
165 """Digitization with pile-up workflow test."""
166
167 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
168 if "maxEvents" not in extra_args:
169 extra_args += " --maxEvents 5"
170
171 if "StandardInTimeOnlyTruth" in extra_args:
172 input_high = input_HITS_minbias_high_fulltruth[run]
173 input_low = input_HITS_minbias_low_fulltruth[run]
174 else:
175 input_high = input_HITS_minbias_high[run]
176 input_low = input_HITS_minbias_low[run]
177
178 self.command = \
179 (f"Digi_tf.py --AMIConfig {ID} --jobNumber 1 --digiSeedOffset1 1 --digiSeedOffset2 1"
180 f" --inputHITSFile {input_HITS_neutrino[run]} --inputHighPtMinbiasHitsFile {input_high} --inputLowPtMinbiasHitsFile {input_low} --outputRDOFile myRDO.pool.root"
181 f" --imf False {extra_args}")
182
184 FrozenTier0PolicyCheck(setup, "RDO", 5),
185 MetadataCheck(setup, "RDO"),
186 ]
187
188 super().__init__(ID, run, type, steps, setup)
189
190
192 """Data overlay minimum bias preprocessing test."""
193
194 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
195 if "maxEvents" not in extra_args:
196 extra_args += " --maxEvents 5"
197
198 self.command = \
199 (f"Overlay_tf.py --AMIConfig {ID}"
200 f" --inputBSFile {input_BS_minimum_bias_overlay[run]} --outputRDO_BKGFile myRDO_BKG.pool.root"
201 f" --imf False {extra_args}")
202
203 # TODO: enable once the output stabilises
204 # self.output_checks = [
205 # FrozenTier0PolicyCheck(setup, "RDO_BKG", 5),
206 # MetadataCheck(setup, "RDO_BKG"),
207 # ]
208
209 super().__init__(ID, run, type, steps, setup)
210
211
213 """Derivations test."""
214
215 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
216 test_def = ID.split("_")
217 data_type = test_def[0].lower()
218 formats = [format.upper() for format in test_def[1:-1]]
219
220 threads = 0
221 if setup.custom_threads is not None:
222 threads = setup.custom_threads
223
224 if "maxEvents" not in extra_args:
225 base_events = 100
226 events = threads * base_events + 1
227 flush = 80
228
229 extra_args += f" --maxEvents {events}"
230 format_flush = ", ".join([f"\"DAOD_{format}\": {flush}" for format in formats])
231 extra_args += f" --preExec 'flags.Output.TreeAutoFlush={{{format_flush}}}'"
232
233 if "inputAODFile" not in extra_args:
234 extra_args += f" --inputAODFile {input_AOD[run][data_type]}"
235
236 # could also use p5503
237 self.command = \
238 (f"ATHENA_CORE_NUMBER={threads} Derivation_tf.py"
239 f" --formats {' '.join(formats)}"
240 " --multiprocess --multithreadedFileValidation True"
241 " --athenaMPMergeTargetSize 'DAOD_*:0'"
242 " --sharedWriter True"
243 " --outputDAODFile myOutput.pool.root"
244 f" --imf False {extra_args}")
245
246 # skip performance checks for now
248
250 for format in formats:
251 self.output_checks.append(FrozenTier0PolicyCheck(setup, f"DAOD_{format}", 10))
252 self.output_checks.append(MetadataCheck(setup, f"DAOD_{format}"))
253
254 super().__init__(ID, run, type, steps, setup)
255
257 """Derivations test with AthenaMT"""
258
259 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
260 test_def = ID.split("_")
261 data_type = test_def[0].lower()
262 formats = [format.upper() for format in test_def[1:-1]]
263 extra_args = extra_args.replace("mtDerivation", "")
264
265 threads = 0
266 if setup.custom_threads is not None:
267 threads = setup.custom_threads
268
269 if "maxEvents" not in extra_args:
270 events = 10
271 extra_args += f" --maxEvents {events}"
272
273 if "inputAODFile" not in extra_args:
274 extra_args += f" --inputAODFile {input_AOD[run][data_type]}"
275
276 self.command = \
277 (f"ATHENA_CORE_NUMBER={threads} Derivation_tf.py"
278 f" --athenaopts='--threads=1'"
279 f" --formats {' '.join(formats)}"
280 " --outputDAODFile myOutput.pool.root"
281 f" --imf False {extra_args}")
282
283 # skip performance checks for now
285
286 super().__init__(ID, run, type, steps, setup)
287
288
290 """Generation test."""
291
292 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
293 if "maxEvents" not in extra_args:
294 extra_args += " --maxEvents 10"
295
296 if "ecmEnergy" not in extra_args:
297 if run is WorkflowRun.Run2:
298 extra_args += " --ecmEnergy 13000"
299 elif run is WorkflowRun.Run3:
300 extra_args += " --ecmEnergy 13600"
301 else:
302 extra_args += " --ecmEnergy 14000"
303
304 dsid = ID.replace("gen", "")
305
306 self.command = \
307 (f"Gen_tf.py --jobConfig {dsid}"
308 " --outputEVNTFile myEVNT.pool.root"
309 f" --imf False {extra_args}")
310
311 super().__init__(ID, run, type, steps, setup)
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")