ATLAS Offline Software
Loading...
Searching...
No Matches
StandardTests.py
Go to the documentation of this file.
1# Copyright (C) 2002-2026 CERN for the benefit of the ATLAS collaboration
2from typing import List
3
4from .Checks import AODContentCheck, AODDigestCheck, FrozenTier0PolicyCheck, MetadataCheck
5from .Inputs import input_EVNT, input_HITS, \
6 input_HITS_unfiltered, \
7 input_RDO_BKG, input_RDO_BKG_data, input_BS_minimum_bias_overlay, \
8 input_EVNT_data_overlay, input_HITS_data_overlay, \
9 input_HITS_minbias_low, input_HITS_minbias_high, input_HITS_neutrino, \
10 input_HITS_minbias_low_fulltruth, input_HITS_minbias_high_fulltruth, \
11 input_AOD
12from .Test import TestSetup, WorkflowRun, WorkflowTest, WorkflowType
13
14
16 """General workflow q-test."""
17
18 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
19 if "maxEvents" not in extra_args:
20 if type == WorkflowType.MCPileUpReco or run == WorkflowRun.Run4:
21 extra_args += " --maxEvents 5"
22 else:
23 extra_args += " --maxEvents 20"
24
25 if type == WorkflowType.MCPileUpReco:
26 if "inputHITSFile" not in extra_args:
27 extra_args += f" --inputHITSFile {input_HITS[run]}"
28 if "inputRDO_BKGFile" not in extra_args:
29 extra_args += " --inputRDO_BKGFile ../run_d*/myRDO.pool.root"
30
31 threads = 1
32 threads_argument = '--multithreaded'
33 if setup.custom_threads is not None:
34 threads = setup.custom_threads
35 if threads <= 0:
36 threads_argument = ''
37
38 self.command = \
39 (f"ATHENA_CORE_NUMBER={threads} Reco_tf.py {threads_argument} --AMIConfig {ID}"
40 f" --imf False {extra_args}")
41
42 self.output_checks = []
43 # TODO: disable RDO comparison for now
44 # if type == WorkflowType.MCReco:
45 # self.output_checks.append(FrozenTier0PolicyCheck(setup, "RDO", 10))
46 self.output_checks.append(FrozenTier0PolicyCheck(setup, "AOD", 60))
47 self.output_checks.append(FrozenTier0PolicyCheck(setup, "ESD", 20))
48 self.output_checks.append(MetadataCheck(setup, "AOD"))
49
50 self.digest_checks = []
51 if not setup.disable_output_checks:
52 self.digest_checks.append(AODContentCheck(setup))
53 self.digest_checks.append(AODDigestCheck(setup))
54
55 super().__init__(ID, run, type, steps, setup)
56
57
59 """Simulation workflow test."""
60
61 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
62 if "maxEvents" not in extra_args:
63 extra_args += " --maxEvents 20"
64
65 if "jobNumber" not in extra_args and run is WorkflowRun.Run3 and type is WorkflowType.FullSim:
66 extra_args += " --jobNumber 5"
67
68 input_argument = ""
69 if "inputEVNTFile" not in extra_args and "inputHITSFile" not in extra_args:
70 if type is WorkflowType.HitsFilter:
71 input_argument = f"--inputHITSFile {input_HITS_unfiltered[run]}"
72 elif type is WorkflowType.HitsMerge:
73 input_argument = f"--inputHITSFile {input_HITS[run]}"
74 else:
75 input_argument = f"--inputEVNTFile {input_EVNT[run]}"
76
77 threads = 0
78 threads_argument = '--multithreaded'
79 if setup.custom_threads is not None:
80 threads = setup.custom_threads
81 if threads <= 0:
82 threads_argument = ''
83
84 if type is WorkflowType.HitsMerge:
85 self.command = \
86 (f"ATHENA_CORE_NUMBER={threads} HITSMerge_tf.py {threads_argument} --AMIConfig {ID}"
87 f" {input_argument} --outputHITS_MRGFile myHITS.pool.root"
88 f" --imf False {extra_args}")
89 elif type is WorkflowType.HitsFilter:
90 self.command = \
91 (f"ATHENA_CORE_NUMBER={threads} FilterHit_tf.py {threads_argument} --AMIConfig {ID}"
92 f" {input_argument} --outputHITS_FILTFile myHITS.pool.root"
93 f" --imf False {extra_args}")
94 else:
95 self.command = \
96 (f"ATHENA_CORE_NUMBER={threads} Sim_tf.py {threads_argument} --AMIConfig {ID}"
97 f" {input_argument} --outputHITSFile myHITS.pool.root"
98 f" --imf False {extra_args}")
99
101 FrozenTier0PolicyCheck(setup, "HITS", 10),
102 MetadataCheck(setup, "HITS"),
103 ]
104
105 super().__init__(ID, run, type, steps, setup)
106
107
109 """MC overlay workflow test."""
110
111 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
112 if "maxEvents" not in extra_args:
113 extra_args += " --maxEvents 10"
114
115 self.command = \
116 (f"Overlay_tf.py --AMIConfig {ID}"
117 f" --inputHITSFile {input_HITS[run]} --inputRDO_BKGFile {input_RDO_BKG[run]} --outputRDOFile myRDO.pool.root"
118 f" --imf False {extra_args}")
119
120 # skip performance checks for now
122
124 FrozenTier0PolicyCheck(setup, "RDO", 10),
125 MetadataCheck(setup, "RDO"),
126 ]
127
128 super().__init__(ID, run, type, steps, setup)
129
130
132 """Data overlay workflow test."""
133
134 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
135 if "maxEvents" not in extra_args:
136 extra_args += " --maxEvents 10"
137
138 if type is WorkflowType.DataOverlayChain:
139 self.command = \
140 (f"FastChain_tf.py --AMIConfig {ID}"
141 f" --inputEVNTFile {input_EVNT_data_overlay[run]} --inputRDO_BKGFile {input_RDO_BKG_data[run]} --outputHITSFile myHITS.pool.root --outputRDOFile myRDO.pool.root"
142 f" --imf False {extra_args}")
143 else:
144 self.command = \
145 (f"Overlay_tf.py --AMIConfig {ID}"
146 f" --inputHITSFile {input_HITS_data_overlay[run]} --inputRDO_BKGFile {input_RDO_BKG_data[run]} --outputRDOFile myRDO.pool.root"
147 f" --imf False {extra_args}")
148
150 if type is WorkflowType.DataOverlayChain:
151 self.output_checks.extend([
152 FrozenTier0PolicyCheck(setup, "HITS", 10),
153 MetadataCheck(setup, "HITS"),
154 ])
155
156 self.output_checks.extend([
157 FrozenTier0PolicyCheck(setup, "RDO", 10),
158 MetadataCheck(setup, "RDO"),
159 ])
160
161 super().__init__(ID, run, type, steps, setup)
162
163
165 """Digitization with pile-up workflow test."""
166
167 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
168 if "maxEvents" not in extra_args:
169 extra_args += " --maxEvents 5"
170
171 if "StandardInTimeOnlyTruth" in extra_args:
172 input_high = input_HITS_minbias_high_fulltruth[run]
173 input_low = input_HITS_minbias_low_fulltruth[run]
174 else:
175 input_high = input_HITS_minbias_high[run]
176 input_low = input_HITS_minbias_low[run]
177
178 self.command = \
179 (f"Digi_tf.py --AMIConfig {ID} --jobNumber 1 --digiSeedOffset1 1 --digiSeedOffset2 1"
180 f" --inputHITSFile {input_HITS_neutrino[run]} --inputHighPtMinbiasHitsFile {input_high} --inputLowPtMinbiasHitsFile {input_low} --outputRDOFile myRDO.pool.root"
181 f" --imf False {extra_args}")
182
184 FrozenTier0PolicyCheck(setup, "RDO", 5),
185 MetadataCheck(setup, "RDO"),
186 ]
187
188 super().__init__(ID, run, type, steps, setup)
189
190
192 """Data overlay minimum bias preprocessing test."""
193
194 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
195 if "maxEvents" not in extra_args:
196 extra_args += " --maxEvents 5"
197
198 self.command = \
199 (f"Overlay_tf.py --AMIConfig {ID}"
200 f" --inputBSFile {input_BS_minimum_bias_overlay[run]} --outputRDO_BKGFile myRDO_BKG.pool.root"
201 f" --imf False {extra_args}")
202
203 # TODO: enable once the output stabilises
204 # self.output_checks = [
205 # FrozenTier0PolicyCheck(setup, "RDO_BKG", 5),
206 # MetadataCheck(setup, "RDO_BKG"),
207 # ]
208
209 super().__init__(ID, run, type, steps, setup)
210
211
213 """Derivations test."""
214
215 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
216 test_def = ID.split("_")
217 multithreaded = False
218 if test_def[-2] == "MT":
219 multithreaded = True
220 test_run = test_def.pop()
221 test_def.pop() # remove MT
222 test_def.append(test_run) # add run back to the end
223 data_type = test_def[0].lower()
224 formats = [format.upper() for format in test_def[1:-1]]
225
226 threads = 0
227 if setup.custom_threads is not None:
228 threads = setup.custom_threads
229
230 if "maxEvents" not in extra_args:
231 base_events = 100
232 events = threads * base_events + 1
233 extra_args += f" --maxEvents {events}"
234
235 if not multithreaded:
236 flush = 80
237 format_flush = ", ".join([f"\"DAOD_{format}\": {flush}" for format in formats])
238 extra_args += f" --preExec 'flags.Output.TreeAutoFlush={{{format_flush}}}'"
239
240 if "inputAODFile" not in extra_args and "inputDAOD_PHYSFile" not in extra_args:
241 extra_args += f" --inputAODFile {input_AOD[run][data_type]}"
242
243 # could also use p5503
244 if not multithreaded:
245 self.command = \
246 (f"ATHENA_CORE_NUMBER={threads} Derivation_tf.py"
247 f" --formats {' '.join(formats)}"
248 " --multiprocess --multithreadedFileValidation True"
249 " --athenaMPMergeTargetSize 'DAOD_*:0'"
250 " --sharedWriter True"
251 " --outputDAODFile myOutput.pool.root"
252 f" --imf False {extra_args}")
253 else:
254 self.command = \
255 (f"ATHENA_CORE_NUMBER={threads} Derivation_tf.py"
256 f" --formats {' '.join(formats)}"
257 " --outputDAODFile myOutput.pool.root"
258 f" --imf False {extra_args}")
259
260 # skip performance checks for now
262
264 for format in formats:
265 self.output_checks.append(FrozenTier0PolicyCheck(setup, f"DAOD_{format}", 10))
266 self.output_checks.append(MetadataCheck(setup, f"DAOD_{format}"))
267
268 super().__init__(ID, run, type, steps, setup)
269
270
272 """Generation test."""
273
274 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = "") -> None:
275 if "maxEvents" not in extra_args:
276 extra_args += " --maxEvents 10"
277
278 if "ecmEnergy" not in extra_args:
279 if run is WorkflowRun.Run2:
280 extra_args += " --ecmEnergy 13000"
281 elif run is WorkflowRun.Run3:
282 extra_args += " --ecmEnergy 13600"
283 else:
284 extra_args += " --ecmEnergy 14000"
285
286 dsid = ID.replace("gen", "")
287
288 self.command = \
289 (f"Gen_tf.py --jobConfig {dsid}"
290 " --outputEVNTFile myEVNT.pool.root"
291 f" --imf False {extra_args}")
292
293 super().__init__(ID, run, type, steps, setup)
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")
None __init__(self, str ID, WorkflowRun run, WorkflowType type, List[str] steps, TestSetup setup, str extra_args="")