ATLAS Offline Software
Loading...
Searching...
No Matches
events.py
Go to the documentation of this file.
1# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
2
3from .sugar import IOVSet, RunLumi, RunLumiType
4from .utils import worst
5
6from heapq import heapreplace, heappush, heappop
7from collections import defaultdict
8
9import logging
10log = logging.getLogger("DQUtils.events")
11
13 """
14 Build a states and empty_iovs list.
15 """
16 empty_iovs, states = [], []
17
18 for index, iovset in enumerate(iovsets):
19 empty_iov = None
20 if isinstance(iovset, IOVSet) and iovset.iov_type:
21 empty_iov = iovset.iov_type.empty()
22 if iovset and "channel" in empty_iov._fields:
23 empty_iov = empty_iov._replace(channel=iovset.first.channel)
24
25 empty_iovs.append(empty_iov)
26 states.append(empty_iov)
27
28 return empty_iovs, states
29
30def process_iovs(*iovsets):
31 """
32 Given a list of input iovsets containing each one channel, yield the state
33 of each iovset in the largest (since, until) range possible. The returned
34 states is the same length as the number of arguments to this function.
35
36 Example usage:
37
38 iovset = fetch_iovs("SHIFTOFL", runs=152166)
39 channels, iovsets = zip(*sorted(iovset.by_channel))
40 # `iovsets` here is a list of lists, each containing the iovs for one channel
41 for since, until, states in process_iovs(*iovsets):
42 print "From", since, "to", until
43 for state in states:
44 print "", state.channel, "is :", state.Code
45
46 or:
47
48 dcsofl_sct = fetch_iovs("DCSOFL", runs=152166, channels="SCTEA")
49 shiftofl_sct = fetch_iovs("SHIFTOFL", runs=152166, channels="SCTEA")
50 for since, until, (dcsofl, shiftofl) in process_iovs(dcsofl, shiftofl):
51 print ("From", since, "to", until,
52 "dcsofl=", dcsofl.Code,
53 "shiftofl=", shiftofl.Code)
54
55 Pitfall: IOVs can exist on the database where there are no state changes.
56 process_iovs will emit
57 """
58
59 empty_iovs, states = build_states_emptyiovs(*iovsets)
60 last_position = None
61
62 for position, index, beginning, iov in iov_yielder(*iovsets):
63 if last_position != position:
64 if last_position is not None:
65 assert last_position < position, "Inputs inconsistent. since !< until."
66 yield last_position, position, states
67 last_position = position
68
69 if beginning:
70 states[index] = iov
71 else:
72 states[index] = empty_iovs[index]
73
75 """
76 Like process_iovs but as well as a states list, also yields the set of
77 indices which changed since the last yield.
78 """
79 empty_iovs, states = build_states_emptyiovs(*iovsets)
80 last_position = None
81 changed_set = set()
82
83 for position, index, beginning, iov in iov_yielder(*iovsets):
84 if last_position != position:
85 if last_position is not None:
86 assert last_position < position, "Inputs inconsistent. since !< until."
87 yield last_position, position, states, changed_set
88 changed_set.clear()
89
90 last_position = position
91
92 changed_set.add(index)
93 if beginning:
94 states[index] = iov
95 else:
96 states[index] = empty_iovs[index]
97
98def process_iovs_mc(*iovsets):
99 """
100 process iovs for multiple channels
101 """
102
103 # Find the complete range
104 range_iov = IOVSet(iovs.range_iov for iovs in iovsets if iovs).range_iov
105 range_iov = IOVSet([range_iov])
106
107 iovsets = [iovs.by_channel for iovs in iovsets]
108 channels = set(iov for iovs in iovsets for iov in iovs.keys())
109
110 for channel in sorted(channels):
111 chan_iovsets = [iovs[channel] for iovs in iovsets]
112 for since, until, states in process_iovs(range_iov, *chan_iovsets):
113 yield since, until, channel, states[1:]
114
115def iov_yielder(*iovs):
116 """
117 Yields sets of iovs in "position" order. Each IoV gets yielded for its start
118 and for its end. (beginning = True and False respectively)
119 """
120 eventqueue = []
121 for index, iterable in enumerate(iovs):
122 it = iter(iterable)
123 try:
124 iov = next(it)
125 except StopIteration:
126 pass
127 else:
128 heappush(eventqueue, (iov.since, True, iov, index, it))
129
130 while eventqueue:
131 position, beginning, iov, index, it = eventqueue[0]
132
133 yield position, index, beginning, iov
134
135 if beginning:
136 next_key = iov.until, False
137
138 else:
139 try:
140 iov = next(it)
141 except StopIteration:
142 heappop(eventqueue)
143 continue
144
145 next_key = iov.since, True
146
147 heapreplace(eventqueue, next_key + (iov, index, it))
148
149def quantize_lb(since, until, states):
150 log.info("Evaluating: %s %s %s", since, until, states)
151 if len(states) == 1:
152 return states.pop()._replace(since=since, until=until)
153
154 result = worst(iov.Code for iov in states)
155 return states.pop()._replace(since=since, until=until, Code=result)
156
158
159 # "n" => 'new' this past lumi block with respect to the end event
160 #ncurrent_events, nended_events = defaultdict(set), defaultdict(set)
161 current_events, ended_events = set(), set()
162 iov_type = None
163 change_in_this_lb = False
164
165 for position, index, beginning, iov in iov_yielder(lbtime, iovs):
166
167 if index == 0:
168 # LBTIME events
169 lb_n = RunLumi(iov.Run, iov.LumiBlock)
170 log.info("LB: %s %s %s %s", ">" if beginning else "]", lb_n, iov.since, iov.until)
171
172 if current_events:
173 if beginning:
174 current_events.difference_update(ended_events)
175 ended_events.clear()
176
177 else:
178 #quantized_state = worst(iov.Code for iov in current_events)
179
180 yield lb_n, change_in_this_lb, current_events
181 # yield #iov_type(lb_n, lb_n+1, channel, quantized_state)
182
183 else:
184 if iov_type is None:
185 iov_type = type(iov)
186 change_in_this_lb = True
187 log.info("Event: %s", iov)
188 (current_events if beginning else ended_events).add(iov)
189
190def default_quantizing_function(current_events):
191 return worst(iov.Code for iov in current_events)
192
193def quantize_iovs(lbtime, iovs, quantizer=default_quantizing_function):
194 from .iov_arrangement import connect_adjacent_iovs
195 iovs = connect_adjacent_iovs(iovs)
196
197 current_events, ended_events = set(), set()
198 need_evaluation = channel = iov_type = None
199 last_lb_n = lb_n = RunLumi(0)
200 # ps => 'previous state'
201 previous_state = ps_since = ps_until = None
202
203 for position, index, beginning, iov in iov_yielder(lbtime, iovs):
204
205 if index == 0:
206 # Luminosity block started or finished
207
208 lb_n = RunLumi(iov.Run, iov.LumiBlock)
209 #print "LB:", ">" if beginning else "]", lb_n, iov.since, iov.until,
210
211 # We passed a run boundary - need emit.
212 emit_needed = lb_n.run != last_lb_n.run
213 if emit_needed:
214 need_evaluation = lb_n + 2
215 elif lb_n.lumi - last_lb_n.lumi > 1:
216 need_evaluation = lb_n + 1
217 log.warning(f"Missing LB range [{last_lb_n + 1}, {lb_n})")
218
219 last_lb_n = lb_n
220
221 # Defines whether something happened which requires
222 do_evaluation = ended_events or need_evaluation >= lb_n
223
224
225 #if do_evaluation:
226 # print "#",
227 #else:
228 # print need_evaluation >= lb_n, need_evaluation, lb_n,
229
230 if do_evaluation and beginning:
231 # LB start: Remove events which finished in the last LB.
232 if ended_events:
233 need_evaluation = lb_n + 2
234 current_events -= ended_events
235 ended_events.clear()
236
237 elif do_evaluation:
238 new_state = quantizer(current_events)
239 #print len(current_events),
240
241 if previous_state != new_state or emit_needed:
242 #print "State change!",
243 # State changed. Emit the previous IoV if there was one,
244 # record when this state started.
245
246 #print "State: ", lb_n, previous_state, "=>", new_state,
247 if previous_state is not None:
248 # This LB changed
249 iov = iov_type(ps_since, ps_until,
250 channel, previous_state)
251 #print
252 #print "== Emitted", iov,
253 yield iov
254
255 ps_since, previous_state = lb_n, new_state
256
257 if not beginning:
258 # The current state should last at least as long as until
259 # the end of the current lumiblock
260 ps_until = lb_n + 1
261
262 #print
263
264 else:
265 #print "---", ">" if beginning else "]", iov
266 if iov_type is None:
267 # First IoV we have come across.
268 iov_type, channel = type(iov), iov.channel
269
270 # Something happened. We need to evaluate this and next lumiblock
271 need_evaluation = lb_n + 2
272
273 # Record this iov in the current or ended events set
274 (current_events if beginning else ended_events).add(iov)
275
276 if previous_state is not None:
277 # There exists a state which hasn't been emitted yet. Do so.
278 iov = iov_type(ps_since, ps_until, channel, previous_state)
279 #print "Finished:", iov
280 yield iov
281
283 quantizer=default_quantizing_function):
284 from .iov_arrangement import connect_adjacent_iovs
285 iovs = connect_adjacent_iovs(iovs)
286
287 current_events, ended_events = set(), set()
288 evaluate_until = state = channel = iov_type = None
289 lb_n = last_lb_n = RunLumi(0)
290
291 for position, index, beginning, iov in iov_yielder(lbtime, iovs):
292
293 if index == 0:
294 # Luminosity block started or finished
295
296 if beginning:
297 # LB start: Remove events which finished since the last LB.
298 current_events -= ended_events
299 ended_events.clear()
300
301 else:
302 lb_n = RunLumi(iov.Run, iov.LumiBlock)
303 if lb_n.run != last_lb_n.run:
304 evaluate_until = lb_n + 2
305 elif lb_n.lumi - last_lb_n.lumi > 1:
306 evaluate_until = lb_n + 1
307 log.warning(f"Missing LB range [{last_lb_n + 1}, {lb_n})")
308 last_lb_n = lb_n
309
310 if evaluate_until >= lb_n:
311 state = quantizer(current_events)
312
313 if state is not None:
314 iov = iov_type(lb_n, lb_n+1, channel, state)
315 yield iov
316 else:
317 if iov_type is None:
318 # First IoV we have come across.
319 iov_type, channel = type(iov), iov.channel
320
321 # Record this iov in the current or ended events set
322 (current_events if beginning else ended_events).add(iov)
323 evaluate_until = lb_n + 2
324
325def quantize_iovs_slow(lbtime, iovs, quantizer=default_quantizing_function):
326 from .iov_arrangement import connect_adjacent_iovs
327 result = quantize_iovs_slow_interm(lbtime, iovs, quantizer)
328 return connect_adjacent_iovs(result)
329
330def quantize_iovs_slow_mc(lbtime, iovs, quantizer=default_quantizing_function):
331 from .iov_arrangement import connect_adjacent_iovs
332 iovs = [connect_adjacent_iovs(chan_iovs) for chan_iovs in iovs]
333
334 iov_type = None
335 current_events, ended_events = defaultdict(set), defaultdict(set)
336 current_state = defaultdict()
337 evaluate_until = defaultdict(int)
338 resultstore = defaultdict(lambda: IOVSet(iov_type=iov_type))
339 lb_n = last_lbn = RunLumi(0)
340
341 for position, index, beginning, iov in iov_yielder(lbtime, *iovs):
342
343 if index == 0:
344 # Luminosity block started or finished
345
346 if beginning:
347 # LB start: Remove events which finished in the last LB.
348 for key, c_ended_events in ended_events.items():
349 if key in current_events:
350 current_events[key] -= c_ended_events
351 c_ended_events.clear()
352
353 else:
354 #lb_n = RunLumi(iov.Run, iov.LumiBlock)
355 lb_n = iov.Run << 32 | iov.LumiBlock
356
357 if last_lbn >> 32 != lb_n >> 32:
358 # We passed a run boundary. Need to evaluate everything for
359 # the next run or two.
360 for channel in current_events:
361 evaluate_until[channel] = lb_n + 2
362
363 elif lb_n - last_lbn > 1:
364 # There is a gap in LB counting. Need to evaluate
365 # everything because of ended events.
366 log.warning(f"Missing LB range [{RunLumiType(last_lbn + 1)}, {RunLumiType(lb_n)})")
367 for channel in current_events:
368 evaluate_until[channel] = lb_n + 1
369
370 for channel, active_events in current_events.items():
371
372 if evaluate_until[channel] >= lb_n:
373 state = current_state[channel] = quantizer(active_events)
374
375 state = current_state[channel]
376
377 if state is not None:
378 part = (RunLumiType(lb_n), RunLumiType(lb_n+1),
379 channel, state)
380 resultstore[channel].add(*part)
381 last_lbn = lb_n
382 else:
383 if iov_type is None: iov_type = type(iov)
384 c = iov.channel
385 # Record this iov in the current or ended events set
386 (current_events[c] if beginning else ended_events[c]).add(iov)
387 evaluate_until[c] = lb_n + 2
388
389 return [value for key, value in sorted(resultstore.items())]
STL class.
bool add(const std::string &hname, TKey *tobj)
Definition fastadd.cxx:55
iov_yielder(*iovs)
Definition events.py:115
process_iovs_changed(*iovsets)
Definition events.py:74
quantize_lb(since, until, states)
Definition events.py:149
quantize_iovs_slow(lbtime, iovs, quantizer=default_quantizing_function)
Definition events.py:325
quantize_iovs_slow_mc(lbtime, iovs, quantizer=default_quantizing_function)
Definition events.py:330
quantize_iovs_slow_interm(lbtime, iovs, quantizer=default_quantizing_function)
Definition events.py:283
default_quantizing_function(current_events)
Definition events.py:190
process_iovs(*iovsets)
Definition events.py:30
process_iovs_mc(*iovsets)
Definition events.py:98
quantize_iovs(lbtime, iovs, quantizer=default_quantizing_function)
Definition events.py:193
build_states_emptyiovs(*iovsets)
Definition events.py:12
quantize_iovs_intermediate(lbtime, iovs)
Definition events.py:157