ATLAS Offline Software
Loading...
Searching...
No Matches
menu_config_tests.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
3'''
4Tests to verify generated menus are valid.
5
6Ported from TrigConfStorage/ConfigurationCheck.cxx and
7HLT/Config/Utility/HLTMenuConfig.py, see [ATR-19830].
8
9Designed to be used by the `verify_menu_config.py` script.
10'''
11
12import re
13from enum import Enum
14from collections import Counter
15
16from TriggerMenuMT.HLT.Menu.SignatureDicts import getListOfSignatureStrings
17
18from AthenaCommon.Logging import logging
19log = logging.getLogger( 'TriggerMenuConfigTest' )
20log.info("Importing %s", __name__)
21
22class TriggerLevel(Enum):
23 HLT = "HLT"
24 L1 = "L1"
25
26
28 def __init__(self, description):
29 self.description = description
30 self.failures = []
31
32 def run(self, config):
33 raise NotImplementedError("ConfigVerification subclass must implement run()")
34
35
37 def __init__(self):
38 super(UniqueChainNames, self).__init__(
39 description="Chain names are unique")
40
41 def run(self, config):
42 names = config["chains"].keys()
43 counts = Counter(names)
44 self.failures = [chain for chain, count
45 in counts.items() if count > 1]
46
47
49 def __init__(self):
50 super(ConsecutiveChainCounters, self).__init__(
51 description="Chain counters are consecutive 1..N")
52
53 def run(self, config):
54 counters = [chain["counter"] for chain
55 in config["chains"].values()]
56 prev_counter = 0
57 for count in counters:
58 if count != prev_counter + 1:
59 self.failures.append(count)
60 prev_counter = count
61
62
64 '''
65 Verifies that chain names start with the expected prefix, and have their
66 parts in the correct order by type, as well as any trigger level specific
67 restrictions.
68 '''
69
70 _SIGNATURE_TYPE_ORDER = {
71 TriggerLevel.HLT: getListOfSignatureStrings(),
72 # TODO: import list of signatures items from L1 code
73 TriggerLevel.L1: [
74 "EM", "MU", "TAU", "J", "XE", "HT"
75 ],
76 }
77
78 def __init__(self, trigger_level):
79 super(StructuredChainNames, self).__init__(
80 description="Chain names are structured in the expected way")
81 self._trigger_level = trigger_level
83 self._SIGNATURE_TYPE_ORDER[trigger_level]
84
85 def run(self, config):
86 if self._trigger_level == TriggerLevel.HLT:
87 names = config["chains"].keys()
88 self.failures = [n for n in names
89 if not self._name_matches_hlt_convention(n)]
90 elif self._trigger_level == TriggerLevel.L1:
91 names = [item["name"] for item in config["items"].values()]
92 self.failures = [n for n in names
93 if not self._name_matches_l1_convention(n)]
94
96 return "_L1" in name and self._matches_shared_conventions(name)
97
99 def FEX_items_in_order(name):
100 '''
101 Where multiple FEX algorithms are used for the same item type,
102 they should be in alphabetical order.
103 '''
104 fex_pattern = r"\d*([egj])({})[A-Z]*\d+s?".format(
105 "|".join(self._signature_type_order))
106 fex_items = re.findall(fex_pattern, name)
107
108 for item_type in self._signature_type_order:
109 fexes_of_type = [fex for fex, match_type in fex_items
110 if match_type == item_type]
111 if not fexes_of_type == sorted(fexes_of_type):
112 return False
113 return True
114
115 return self._matches_shared_conventions(name) and FEX_items_in_order(name)
116
118 '''
119 True if name starts with level prefix, and all signature
120 types are in the correct order, otherwise False.
121 '''
122 # The signature objects in each item should be ordered by type, in the
123 # order defined in _SIGNATURE_TYPE_ORDER.
124 signature_types = "|".join(self._signature_type_order)
125 sig_type_pattern = re.compile(
126 r"_\d*[egj]?({})\d+s?".format(signature_types))
127
128# this is commented because needs to be discussed (it will be removed/changed in next dev)
129 # re to find the signature that has the probe leg
130 #sig_probe_pattern = re.compile(r"_\d*?({})\d+s?[\D]*?_probe".format(signature_types))
131
132
133 def items_in_order(part):
134 #part = part.replace("leg","p") #if we leave the word leg, the findall(..) function will find a 'g'
135 indices = [self._signature_type_order.index(x) for x in
136 sig_type_pattern.findall(part)]
137
138# this is commented because needs to be discussed (it will be removed/changed in next dev)
139 # this finds the signatures with the probe leg
140 # matches = sig_type_pattern.findall(part)
141 # matches_probe = sig_probe_pattern.findall(part)
142 # if len(matches_probe):
143 # assert(len(matches_probe)==1)
144 # probe_leg = matches_probe[0]
145 # matches_after_probe = matches.copy()
146 # # force the probe leg to be the last one
147 # if probe_leg in matches_after_probe:
148 # # Remove the element
149 # matches_after_probe.remove(probe_leg)
150 # # Append it to the end
151 # matches_after_probe.append(probe_leg)
152 # indices_after_probe = [self._signature_type_order.index(x) for x in matches_after_probe]
153 # # copy the new indices
154 # #indices = indices_after_probe.copy()
155
156
157 rr = indices == sorted(indices)
158 if not rr:
159 log.error("[StructuredChainNames::items_in_order] %s NOT SORTED!", indices)
160
161 return rr
162
163 def are_signatures_in_order(name_parts):
164
165 to_check = ["".join(f"_{p}" for p in name_parts if "-" not in p)]
166
167 # Sections of topo item parts are checked for ordering independently.
168 topo_parts = [p for p in name_parts if "-" in p]
169 for topo in topo_parts:
170 to_check.extend(topo.split("-"))
171 res = all(items_in_order(part) for part in to_check)
172 if not res:
173 for part in to_check:
174 if not items_in_order(part):
175 log.error("[StructuredChainNames::are_signatures_in_order] %s not in order!", part)
176 return res
177
178 # Name must begin with the trigger level, and contain at least one item.
179 parts = name.split("_")
180
181 result= all((len(parts) > 1,
182 parts[0] == self._trigger_level.value,
183 are_signatures_in_order(parts[1:])))
184 if not result:
185 log.error("[StructuredChainNames::_matches_shared_conventions] chain deosn't match convention: parts[0] = %s, value = %s, parts[1:] = %s, signature_types = %s",
186 parts[0], self._trigger_level.value, parts[1:], signature_types)
187
188 return result
189
190
192 def __init__(self):
193 super(RestrictedCTPIDs, self).__init__(
194 description="Less than 512 CTP items, and no CTP id greater than 512")
195
196 def run(self, config):
197 ctp_ids = {name: item["ctpid"] for
198 name, item in config["items"].items()}
199 if len(ctp_ids) > 512:
200 self.failures.append(
201 "More than 512 CTP items defined")
202 over_max_ids = [name for name, ctp_id in ctp_ids.items()
203 if ctp_id > 512]
204 self.failures.extend(over_max_ids)
205
206
208 def __init__(self):
209 super(PartialEventBuildingChecks, self).__init__(
210 description='Config consistency of Partial Event Building')
211
212 def run(self, config):
213 from TriggerMenuMT.HLT.Menu import EventBuildingInfo
214 eb_identifiers = EventBuildingInfo.getAllEventBuildingIdentifiers()
215
216 for chain_name, chain_config in config['chains'].items():
217 peb_identifiers = [idf for idf in eb_identifiers if '_'+idf+'_' in chain_name]
218 peb_writers = [seq for seq in chain_config['sequencers'] if 'PEBInfoWriter' in seq]
219
220 is_peb_chain = (len(peb_identifiers) > 0 or len(peb_writers) > 0)
221
222 # Check streaming configuration
223 for stream_name in chain_config['streams']:
224 if stream_name not in config['streams']:
225 self.failures.append(
226 'Stream {:s} for chain {:s} is not defined in streaming configuration'.format(
227 stream_name, chain_name))
228
229 is_feb_stream = config['streams'][stream_name]['forceFullEventBuilding']
230
231 if is_peb_chain and is_feb_stream:
232 self.failures.append(
233 'PEB chain {:s} streamed to a full-event-building stream {:s} '
234 '(forceFullEventBuilding=True)'.format(
235 chain_name, stream_name))
236
237 elif not is_peb_chain and not is_feb_stream:
238 self.failures.append(
239 'Full-event-building chain {:s} streamed to the stream {:s} which allows partial '
240 'event building (forceFullEventBuilding=False)'.format(
241 chain_name, stream_name))
242
243 if not is_peb_chain:
244 # Not a PEB chain, skip further PEB-specific checks
245 continue
246
247 if len(peb_identifiers) != 1:
248 self.failures.append(
249 '{:s} has {:d} event building identifiers'.format(chain_name, len(peb_identifiers)))
250
251 if len(peb_writers) != 1:
252 self.failures.append(
253 '{:s} has {:d} PEBInfoWriter sequences'.format(chain_name, len(peb_writers)))
254
255 if peb_identifiers and peb_writers and not peb_writers[0].endswith(peb_identifiers[0]):
256 self.failures.append(
257 '{:s} PEB sequence name {:s} doesn\'t end with PEB identifier {:s}'.format(
258 chain_name, peb_writers[0], peb_identifiers[0]))
259
260
261
262menu_tests = {
263 TriggerLevel.HLT: [
266 StructuredChainNames(TriggerLevel.HLT),
268 ],
269 TriggerLevel.L1: [
271 StructuredChainNames(TriggerLevel.L1),
272 ]
273}
Definition index.py:1
Definition run.py:1