2 from typing
import List
4 from .Checks
import AODContentCheck, AODDigestCheck, FrozenTier0PolicyCheck, MetadataCheck
5 from .Inputs
import input_EVNT, input_HITS, \
6 input_HITS_unfiltered, \
7 input_RDO_BKG, input_BS_minimum_bias_overlay, \
8 input_HITS_data_overlay, input_BS_SKIM, \
9 input_HITS_minbias_low, input_HITS_minbias_high, input_HITS_neutrino, \
10 input_HITS_minbias_low_fulltruth, input_HITS_minbias_high_fulltruth, \
12 from .Test
import TestSetup, WorkflowRun, WorkflowTest, WorkflowType
16 """General workflow q-test."""
18 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
19 if "maxEvents" not in extra_args:
20 if type == WorkflowType.MCPileUpReco
or run == WorkflowRun.Run4:
21 extra_args +=
" --maxEvents 5"
23 extra_args +=
" --maxEvents 20"
25 if type == WorkflowType.MCPileUpReco:
26 if "inputHITSFile" not in extra_args:
27 extra_args += f
" --inputHITSFile {input_HITS[run]}"
28 if "inputRDO_BKGFile" not in extra_args:
29 extra_args +=
" --inputRDO_BKGFile ../run_d*/myRDO.pool.root"
32 threads_argument =
'--multithreaded'
33 if setup.custom_threads
is not None:
34 threads = setup.custom_threads
39 (f
"ATHENA_CORE_NUMBER={threads} Reco_tf.py {threads_argument} --AMIConfig {ID}"
40 f
" --imf False {extra_args}")
51 if not setup.disable_output_checks:
55 super().
__init__(ID, run, type, steps, setup)
59 """Simulation workflow test."""
61 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
62 if "maxEvents" not in extra_args:
63 extra_args +=
" --maxEvents 20"
65 if "jobNumber" not in extra_args
and run
is WorkflowRun.Run3
and type
is WorkflowType.FullSim:
66 extra_args +=
" --jobNumber 5"
69 if "inputEVNTFile" not in extra_args
and "inputHITSFile" not in extra_args:
70 if type
is WorkflowType.HitsFilter:
71 input_argument = f
"--inputHITSFile {input_HITS_unfiltered[run]}"
72 elif type
is WorkflowType.HitsMerge:
73 input_argument = f
"--inputHITSFile {input_HITS[run]}"
75 input_argument = f
"--inputEVNTFile {input_EVNT[run]}"
78 threads_argument =
'--multithreaded'
79 if setup.custom_threads
is not None:
80 threads = setup.custom_threads
84 if type
is WorkflowType.HitsMerge:
86 (f
"ATHENA_CORE_NUMBER={threads} HITSMerge_tf.py {threads_argument} --AMIConfig {ID}"
87 f
" {input_argument} --outputHITS_MRGFile myHITS.pool.root"
88 f
" --imf False {extra_args}")
89 elif type
is WorkflowType.HitsFilter:
91 (f
"ATHENA_CORE_NUMBER={threads} FilterHit_tf.py {threads_argument} --AMIConfig {ID}"
92 f
" {input_argument} --outputHITS_FILTFile myHITS.pool.root"
93 f
" --imf False {extra_args}")
96 (f
"ATHENA_CORE_NUMBER={threads} Sim_tf.py {threads_argument} --AMIConfig {ID}"
97 f
" {input_argument} --outputHITSFile myHITS.pool.root"
98 f
" --imf False {extra_args}")
105 super().
__init__(ID, run, type, steps, setup)
109 """MC overlay workflow test."""
111 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
112 if "maxEvents" not in extra_args:
113 extra_args +=
" --maxEvents 10"
116 (f
"Overlay_tf.py --AMIConfig {ID}"
117 f
" --inputHITSFile {input_HITS[run]} --inputRDO_BKGFile {input_RDO_BKG[run]} --outputRDOFile myRDO.pool.root"
118 f
" --imf False {extra_args}")
128 super().
__init__(ID, run, type, steps, setup)
132 """Data overlay workflow test."""
134 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
135 if "maxEvents" not in extra_args:
136 extra_args +=
" --maxEvents 10"
139 (f
"Overlay_tf.py --AMIConfig {ID}"
140 f
" --inputHITSFile {input_HITS_data_overlay[run]} --inputBS_SKIMFile {input_BS_SKIM[run]} --outputRDOFile myRDO.pool.root"
141 " --triggerConfig 'Overlay=NONE'"
142 f
" --imf False {extra_args}")
149 super().
__init__(ID, run, type, steps, setup)
153 """Digitization with pile-up workflow test."""
155 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
156 if "maxEvents" not in extra_args:
157 extra_args +=
" --maxEvents 5"
159 if "StandardInTimeOnlyTruth" in extra_args:
160 input_high = input_HITS_minbias_high_fulltruth[run]
161 input_low = input_HITS_minbias_low_fulltruth[run]
163 input_high = input_HITS_minbias_high[run]
164 input_low = input_HITS_minbias_low[run]
167 (f
"Digi_tf.py --AMIConfig {ID} --jobNumber 1 --digiSeedOffset1 1 --digiSeedOffset2 1"
168 f
" --inputHITSFile {input_HITS_neutrino[run]} --inputHighPtMinbiasHitsFile {input_high} --inputLowPtMinbiasHitsFile {input_low} --outputRDOFile myRDO.pool.root"
169 f
" --imf False {extra_args}")
176 super().
__init__(ID, run, type, steps, setup)
180 """Data overlay minimum bias preprocessing test."""
182 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
183 if "maxEvents" not in extra_args:
184 extra_args +=
" --maxEvents 5"
187 (f
"Overlay_tf.py --AMIConfig {ID}"
188 f
" --inputBSFile {input_BS_minimum_bias_overlay[run]} --outputRDO_BKGFile myRDO_BKG.pool.root"
189 f
" --imf False {extra_args}")
197 super().
__init__(ID, run, type, steps, setup)
201 """Derivations test."""
203 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
204 test_def = ID.split(
"_")
205 data_type = test_def[0].lower()
206 formats = [format.upper()
for format
in test_def[1:-1]]
209 if setup.custom_threads
is not None:
210 threads = setup.custom_threads
212 if "maxEvents" not in extra_args:
214 events = threads * base_events + 1
217 extra_args += f
" --maxEvents {events}"
218 format_flush =
", ".
join([f
"\"DAOD_{format}\": {flush}" for format
in formats])
219 extra_args += f
" --preExec 'flags.Output.TreeAutoFlush={{{format_flush}}}'"
220 if "inputAODFile" not in extra_args:
221 extra_args += f
" --inputAODFile {input_AOD[run][data_type]}"
225 (f
"ATHENA_CORE_NUMBER={threads} Derivation_tf.py"
226 f
" --formats {' '.join(formats)}"
227 " --multiprocess --multithreadedFileValidation True"
228 " --athenaMPMergeTargetSize 'DAOD_*:0'"
229 " --sharedWriter True"
230 " --outputDAODFile myOutput.pool.root"
231 f
" --imf False {extra_args}")
237 for format
in formats:
241 super().
__init__(ID, run, type, steps, setup)
245 """Generation test."""
247 def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str =
"") ->
None:
248 if "maxEvents" not in extra_args:
249 extra_args +=
" --maxEvents 10"
251 if "ecmEnergy" not in extra_args:
252 if run
is WorkflowRun.Run2:
253 extra_args +=
" --ecmEnergy 13000"
254 elif run
is WorkflowRun.Run3:
255 extra_args +=
" --ecmEnergy 13600"
257 extra_args +=
" --ecmEnergy 14000"
259 dsid = ID.replace(
"gen",
"")
262 (f
"Gen_tf.py --jobConfig {dsid}"
263 " --outputEVNTFile myEVNT.pool.root"
264 f
" --imf False {extra_args}")
266 super().
__init__(ID, run, type, steps, setup)