2 from typing
import List
4 from .Checks
import AODContentCheck, AODDigestCheck, FrozenTier0PolicyCheck, MetadataCheck
5 from .Inputs
import input_EVNT, input_HITS, \
6 input_HITS_unfiltered, \
7 input_RDO_BKG, input_RDO_BKG_data, input_BS_minimum_bias_overlay, \
8 input_EVNT_data_overlay, input_HITS_data_overlay, \
9 input_HITS_minbias_low, input_HITS_minbias_high, input_HITS_neutrino, \
10 input_HITS_minbias_low_fulltruth, input_HITS_minbias_high_fulltruth, \
12 from .Test
import TestSetup, WorkflowRun, WorkflowTest, WorkflowType
16 """General workflow q-test."""
18 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
19 if "maxEvents" not in extra_args:
20 if type == WorkflowType.MCPileUpReco
or run == WorkflowRun.Run4:
21 extra_args +=
" --maxEvents 5"
23 extra_args +=
" --maxEvents 20"
25 if type == WorkflowType.MCPileUpReco:
26 if "inputHITSFile" not in extra_args:
27 extra_args += f
" --inputHITSFile {input_HITS[run]}"
28 if "inputRDO_BKGFile" not in extra_args:
29 extra_args +=
" --inputRDO_BKGFile ../run_d*/myRDO.pool.root"
32 threads_argument =
'--multithreaded'
33 if setup.custom_threads
is not None:
34 threads = setup.custom_threads
39 (f
"ATHENA_CORE_NUMBER={threads} Reco_tf.py {threads_argument} --AMIConfig {ID}"
40 f
" --imf False {extra_args}")
51 if not setup.disable_output_checks:
55 super().
__init__(ID, run, type, steps, setup)
59 """Simulation workflow test."""
61 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
62 if "maxEvents" not in extra_args:
63 extra_args +=
" --maxEvents 20"
65 if "jobNumber" not in extra_args
and run
is WorkflowRun.Run3
and type
is WorkflowType.FullSim:
66 extra_args +=
" --jobNumber 5"
69 if "inputEVNTFile" not in extra_args
and "inputHITSFile" not in extra_args:
70 if type
is WorkflowType.HitsFilter:
71 input_argument = f
"--inputHITSFile {input_HITS_unfiltered[run]}"
72 elif type
is WorkflowType.HitsMerge:
73 input_argument = f
"--inputHITSFile {input_HITS[run]}"
75 input_argument = f
"--inputEVNTFile {input_EVNT[run]}"
78 threads_argument =
'--multithreaded'
79 if setup.custom_threads
is not None:
80 threads = setup.custom_threads
84 if type
is WorkflowType.HitsMerge:
86 (f
"ATHENA_CORE_NUMBER={threads} HITSMerge_tf.py {threads_argument} --AMIConfig {ID}"
87 f
" {input_argument} --outputHITS_MRGFile myHITS.pool.root"
88 f
" --imf False {extra_args}")
89 elif type
is WorkflowType.HitsFilter:
91 (f
"ATHENA_CORE_NUMBER={threads} FilterHit_tf.py {threads_argument} --AMIConfig {ID}"
92 f
" {input_argument} --outputHITS_FILTFile myHITS.pool.root"
93 f
" --imf False {extra_args}")
96 (f
"ATHENA_CORE_NUMBER={threads} Sim_tf.py {threads_argument} --AMIConfig {ID}"
97 f
" {input_argument} --outputHITSFile myHITS.pool.root"
98 f
" --imf False {extra_args}")
105 super().
__init__(ID, run, type, steps, setup)
109 """MC overlay workflow test."""
111 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
112 if "maxEvents" not in extra_args:
113 extra_args +=
" --maxEvents 10"
116 (f
"Overlay_tf.py --AMIConfig {ID}"
117 f
" --inputHITSFile {input_HITS[run]} --inputRDO_BKGFile {input_RDO_BKG[run]} --outputRDOFile myRDO.pool.root"
118 f
" --imf False {extra_args}")
128 super().
__init__(ID, run, type, steps, setup)
132 """Data overlay workflow test."""
134 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
135 if "maxEvents" not in extra_args:
136 extra_args +=
" --maxEvents 10"
138 if type
is WorkflowType.DataOverlayChain:
140 (f
"FastChain_tf.py --AMIConfig {ID}"
141 f
" --inputEVNTFile {input_EVNT_data_overlay[run]} --inputRDO_BKGFile {input_RDO_BKG_data[run]} --outputHITSFile myHITS.pool.root --outputRDOFile myRDO.pool.root"
142 f
" --imf False {extra_args}")
145 (f
"Overlay_tf.py --AMIConfig {ID}"
146 f
" --inputHITSFile {input_HITS_data_overlay[run]} --inputRDO_BKGFile {input_RDO_BKG_data[run]} --outputRDOFile myRDO.pool.root"
147 f
" --imf False {extra_args}")
150 if type
is WorkflowType.DataOverlayChain:
161 super().
__init__(ID, run, type, steps, setup)
165 """Digitization with pile-up workflow test."""
167 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
168 if "maxEvents" not in extra_args:
169 extra_args +=
" --maxEvents 5"
171 if "StandardInTimeOnlyTruth" in extra_args:
172 input_high = input_HITS_minbias_high_fulltruth[run]
173 input_low = input_HITS_minbias_low_fulltruth[run]
175 input_high = input_HITS_minbias_high[run]
176 input_low = input_HITS_minbias_low[run]
179 (f
"Digi_tf.py --AMIConfig {ID} --jobNumber 1 --digiSeedOffset1 1 --digiSeedOffset2 1"
180 f
" --inputHITSFile {input_HITS_neutrino[run]} --inputHighPtMinbiasHitsFile {input_high} --inputLowPtMinbiasHitsFile {input_low} --outputRDOFile myRDO.pool.root"
181 f
" --imf False {extra_args}")
188 super().
__init__(ID, run, type, steps, setup)
192 """Data overlay minimum bias preprocessing test."""
194 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
195 if "maxEvents" not in extra_args:
196 extra_args +=
" --maxEvents 5"
199 (f
"Overlay_tf.py --AMIConfig {ID}"
200 f
" --inputBSFile {input_BS_minimum_bias_overlay[run]} --outputRDO_BKGFile myRDO_BKG.pool.root"
201 f
" --imf False {extra_args}")
209 super().
__init__(ID, run, type, steps, setup)
213 """Derivations test."""
215 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
216 test_def = ID.split(
"_")
217 data_type = test_def[0].lower()
218 formats = [format.upper()
for format
in test_def[1:-1]]
221 if setup.custom_threads
is not None:
222 threads = setup.custom_threads
224 if "maxEvents" not in extra_args:
226 events = threads * base_events + 1
229 extra_args += f
" --maxEvents {events}"
230 format_flush =
", ".
join([f
"\"DAOD_{format}\": {flush}" for format
in formats])
231 extra_args += f
" --preExec 'flags.Output.TreeAutoFlush={{{format_flush}}}'"
233 if "inputAODFile" not in extra_args:
234 extra_args += f
" --inputAODFile {input_AOD[run][data_type]}"
238 (f
"ATHENA_CORE_NUMBER={threads} Derivation_tf.py"
239 f
" --formats {' '.join(formats)}"
240 " --multiprocess --multithreadedFileValidation True"
241 " --athenaMPMergeTargetSize 'DAOD_*:0'"
242 " --sharedWriter True"
243 " --outputDAODFile myOutput.pool.root"
244 f
" --imf False {extra_args}")
250 for format
in formats:
254 super().
__init__(ID, run, type, steps, setup)
258 """Generation test."""
260 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
261 if "maxEvents" not in extra_args:
262 extra_args +=
" --maxEvents 10"
264 if "ecmEnergy" not in extra_args:
265 if run
is WorkflowRun.Run2:
266 extra_args +=
" --ecmEnergy 13000"
267 elif run
is WorkflowRun.Run3:
268 extra_args +=
" --ecmEnergy 13600"
270 extra_args +=
" --ecmEnergy 14000"
272 dsid = ID.replace(
"gen",
"")
275 (f
"Gen_tf.py --jobConfig {dsid}"
276 " --outputEVNTFile myEVNT.pool.root"
277 f
" --imf False {extra_args}")
279 super().
__init__(ID, run, type, steps, setup)