3 from .sugar
import IOVSet, RunLumi, RunLumiType
4 from .utils
import worst
6 from heapq
import heapreplace, heappush, heappop
7 from collections
import defaultdict
10 log = logging.getLogger(
"DQUtils.events")
14 Build a states and empty_iovs list.
16 empty_iovs, states = [], []
18 for index, iovset
in enumerate(iovsets):
20 if isinstance(iovset, IOVSet)
and iovset.iov_type:
21 empty_iov = iovset.iov_type.empty()
22 if iovset
and "channel" in empty_iov._fields:
23 empty_iov = empty_iov._replace(channel=iovset.first.channel)
25 empty_iovs.append(empty_iov)
26 states.append(empty_iov)
28 return empty_iovs, states
32 Given a list of input iovsets containing each one channel, yield the state
33 of each iovset in the largest (since, until) range possible. The returned
34 states is the same length as the number of arguments to this function.
38 iovset = fetch_iovs("SHIFTOFL", runs=152166)
39 channels, iovsets = zip(*sorted(iovset.by_channel))
40 # `iovsets` here is a list of lists, each containing the iovs for one channel
41 for since, until, states in process_iovs(*iovsets):
42 print "From", since, "to", until
44 print "", state.channel, "is :", state.Code
48 dcsofl_sct = fetch_iovs("DCSOFL", runs=152166, channels="SCTEA")
49 shiftofl_sct = fetch_iovs("SHIFTOFL", runs=152166, channels="SCTEA")
50 for since, until, (dcsofl, shiftofl) in process_iovs(dcsofl, shiftofl):
51 print ("From", since, "to", until,
52 "dcsofl=", dcsofl.Code,
53 "shiftofl=", shiftofl.Code)
55 Pitfall: IOVs can exist on the database where there are no state changes.
56 process_iovs will emit
62 for position, index, beginning, iov
in iov_yielder(*iovsets):
63 if last_position != position:
64 if last_position
is not None:
65 assert last_position < position,
"Inputs inconsistent. since !< until."
66 yield last_position, position, states
67 last_position = position
72 states[index] = empty_iovs[index]
76 Like process_iovs but as well as a states list, also yields the set of
77 indices which changed since the last yield.
83 for position, index, beginning, iov
in iov_yielder(*iovsets):
84 if last_position != position:
85 if last_position
is not None:
86 assert last_position < position,
"Inputs inconsistent. since !< until."
87 yield last_position, position, states, changed_set
90 last_position = position
92 changed_set.add(index)
96 states[index] = empty_iovs[index]
100 process iovs for multiple channels
104 range_iov = IOVSet(iovs.range_iov
for iovs
in iovsets
if iovs).range_iov
105 range_iov = IOVSet([range_iov])
107 iovsets = [iovs.by_channel
for iovs
in iovsets]
108 channels =
set(iov
for iovs
in iovsets
for iov
in iovs.keys())
110 for channel
in sorted(channels):
111 chan_iovsets = [iovs[channel]
for iovs
in iovsets]
112 for since, until, states
in process_iovs(range_iov, *chan_iovsets):
113 yield since, until, channel, states[1:]
117 Yields sets of iovs in "position" order. Each IoV gets yielded for its start
118 and for its end. (beginning = True and False respectively)
121 for index, iterable
in enumerate(iovs):
125 except StopIteration:
128 heappush(eventqueue, (iov.since,
True, iov, index, it))
131 position, beginning, iov, index, it = eventqueue[0]
133 yield position, index, beginning, iov
136 next_key = iov.until,
False
141 except StopIteration:
145 next_key = iov.since,
True
147 heapreplace(eventqueue, next_key + (iov, index, it))
150 log.info(
"Evaluating: %s %s %s", since, until, states)
152 return states.pop()._replace(since=since, until=until)
154 result =
worst(iov.Code
for iov
in states)
155 return states.pop()._replace(since=since, until=until, Code=result)
161 current_events, ended_events =
set(),
set()
163 change_in_this_lb =
False
165 for position, index, beginning, iov
in iov_yielder(lbtime, iovs):
169 lb_n =
RunLumi(iov.Run, iov.LumiBlock)
170 log.info(
"LB: %s %s %s %s",
">" if beginning
else "]", lb_n, iov.since, iov.until)
174 current_events.difference_update(ended_events)
180 yield lb_n, change_in_this_lb, current_events
186 change_in_this_lb =
True
187 log.info(
"Event: %s", iov)
188 (current_events
if beginning
else ended_events).
add(iov)
191 return worst(iov.Code
for iov
in current_events)
194 from .iov_arrangement
import connect_adjacent_iovs
197 current_events, ended_events =
set(),
set()
198 need_evaluation = channel = iov_type =
None
201 previous_state = ps_since = ps_until =
None
203 for position, index, beginning, iov
in iov_yielder(lbtime, iovs):
208 lb_n =
RunLumi(iov.Run, iov.LumiBlock)
212 emit_needed = lb_n.run != last_lb_n.run
214 need_evaluation = lb_n + 2
215 elif lb_n.lumi - last_lb_n.lumi > 1:
216 need_evaluation = lb_n + 1
217 log.warning(f
"Missing LB range [{last_lb_n + 1}, {lb_n})")
222 do_evaluation = ended_events
or need_evaluation >= lb_n
230 if do_evaluation
and beginning:
233 need_evaluation = lb_n + 2
234 current_events -= ended_events
238 new_state = quantizer(current_events)
241 if previous_state != new_state
or emit_needed:
247 if previous_state
is not None:
249 iov = iov_type(ps_since, ps_until,
250 channel, previous_state)
255 ps_since, previous_state = lb_n, new_state
268 iov_type, channel =
type(iov), iov.channel
271 need_evaluation = lb_n + 2
274 (current_events
if beginning
else ended_events).
add(iov)
276 if previous_state
is not None:
278 iov = iov_type(ps_since, ps_until, channel, previous_state)
283 quantizer=default_quantizing_function):
284 from .iov_arrangement
import connect_adjacent_iovs
287 current_events, ended_events =
set(),
set()
288 evaluate_until = state = channel = iov_type =
None
291 for position, index, beginning, iov
in iov_yielder(lbtime, iovs):
298 current_events -= ended_events
302 lb_n =
RunLumi(iov.Run, iov.LumiBlock)
303 if lb_n.run != last_lb_n.run:
304 evaluate_until = lb_n + 2
305 elif lb_n.lumi - last_lb_n.lumi > 1:
306 evaluate_until = lb_n + 1
307 log.warning(f
"Missing LB range [{last_lb_n + 1}, {lb_n})")
310 if evaluate_until >= lb_n:
311 state = quantizer(current_events)
313 if state
is not None:
314 iov = iov_type(lb_n, lb_n+1, channel, state)
319 iov_type, channel =
type(iov), iov.channel
322 (current_events
if beginning
else ended_events).
add(iov)
323 evaluate_until = lb_n + 2
326 from .iov_arrangement
import connect_adjacent_iovs
331 from .iov_arrangement
import connect_adjacent_iovs
335 current_events, ended_events = defaultdict(set), defaultdict(set)
336 current_state = defaultdict()
337 evaluate_until = defaultdict(int)
338 resultstore = defaultdict(
lambda: IOVSet(iov_type=iov_type))
341 for position, index, beginning, iov
in iov_yielder(lbtime, *iovs):
348 for key, c_ended_events
in ended_events.items():
349 if key
in current_events:
350 current_events[key] -= c_ended_events
351 c_ended_events.clear()
355 lb_n = iov.Run << 32 | iov.LumiBlock
357 if last_lbn >> 32 != lb_n >> 32:
360 for channel
in current_events:
361 evaluate_until[channel] = lb_n + 2
363 elif lb_n - last_lbn > 1:
366 log.warning(f
"Missing LB range [{RunLumiType(last_lbn + 1)}, {RunLumiType(lb_n)})")
367 for channel
in current_events:
368 evaluate_until[channel] = lb_n + 1
370 for channel, active_events
in current_events.items():
372 if evaluate_until[channel] >= lb_n:
373 state = current_state[channel] = quantizer(active_events)
375 state = current_state[channel]
377 if state
is not None:
378 part = (RunLumiType(lb_n), RunLumiType(lb_n+1),
380 resultstore[channel].
add(*part)
383 if iov_type
is None: iov_type =
type(iov)
386 (current_events[c]
if beginning
else ended_events[c]).
add(iov)
387 evaluate_until[c] = lb_n + 2
389 return [value
for key, value
in sorted(resultstore.items())]