ATLAS Offline Software
Loading...
Searching...
No Matches
DQDefects/python/db.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
3"""
4Authors: Peter Waller <peter.waller@cern.ch> and "Peter Onyisi" <peter.onyisi@cern.ch>
5
6Please see https://twiki.cern.ch/twiki/bin/viewauth/Atlas/DQDefects
7
8This file defines DefectsDB and some of its public interface.
9
10To separate out the code into digestable units, this class is split into mixins
11which can be found defined in other files in this directory.
12
13The bulk of the core functionality is found in this file.
14"""
15
16
17from logging import getLogger; log = getLogger("DQDefects.db")
18
19from contextlib import contextmanager
20
21from DQUtils import fetch_iovs, IOVSet
22from DQUtils.sugar.iovtype import IOVType
23from DQUtils.channel_mapping import list_to_channelselection
24from DQUtils.sugar import RunLumi
25
26from . import DEFAULT_CONNECTION_STRING, DEFECTS_FOLDER
27
28from .exceptions import DefectExistsError, DefectUnknownError
29from .folders import DefectsDBFoldersMixin
30from .ids import DefectsDBIDsNamesMixin, choose_new_defect_id
31from .tags import DefectsDBTagsMixin, tagtype
32from .virtual_mixin import DefectsDBVirtualDefectsMixin
33from .virtual_calculator import calculate_virtual_defects
34from typing import Union, Tuple, Optional, Iterable, Collection, Mapping
35
37 DefectsDBTagsMixin,
38 DefectsDBFoldersMixin,
39 DefectsDBIDsNamesMixin,
40 object):
41 """
42 Implementation is split into mixins.
43
44 DefectsDBIDsNamesMixin:
45 Defines functions for dealing with defect IDs
46
47 DefectsDBFolderMixin:
48 Logic for loading or creating the COOL folder/database
49
50 DefectsDBFoldersMixin:
51 Functions for managing tags
52
53 DefectsDBVirtualDefectsMixin:
54 Internal code for managing and computing virutal defects
55
56 Public interface is nominally defined in this class (DefectsDB).
57 """
58
59 def __init__(self, connection_string: str = DEFAULT_CONNECTION_STRING,
60 read_only: bool = True, create: bool = False, tag: Union[str, Tuple] = "HEAD") -> None:
61 """
62 Create a new DefectsDB instance.
63
64 The COOL folder resides at `DEFECTS_FOLDER`, which is a module-level
65 varaiable in this module.
66
67 Parameters:
68 `connection_string` : A COOL connection string, or a string ending
69 in .db. In the latter case, the database is
70 assumed to be a sqlite database at that path.
71 `read_only` : Indicates whether insertions will be disallowed on
72 this connection. (Default: True)
73 `create` : If True, attempt to create the database if it doesn't
74 exist. (Default: False)
75 `tag` : The COOL tag(s) to use. Either a single string used
76 for both defects and virtual defect logic, or a
77 2-tuple (defecttag, logictag). The default is to
78 use HEAD for both defects and logic. Either
79 folder-level or hierarchical tags can be given.
80 """
81 self.connection_string = connection_string
82 self._read_only = read_only
83 self._create = create
84 if isinstance(tag, str):
85 self._tag = tagtype(tag, tag) if tag else tagtype("HEAD", "HEAD")
86 else:
87 try:
88 tag = tagtype._make(tag)
89 except Exception:
90 raise TypeError('tag argument must be a 2-element sequence')
91 if len(tag) != 2:
92 raise TypeError('tag argument must be a 2-element sequence')
93 self._tag = tag
94 self._tag = tagtype(self._tag[0],
95 self._tag[1])
96
97 # COOL has no way of emptying a storage buffer. Creating a new storage
98 # buffer flushes the old one. Therefore, if an exception happens
99 # halfway through setting up a storage buffer, subsequent operations
100 # risk flushing a half-completed operation. In that case, we refuse to
101 # do any more write operations with that database connection.
102 self._bad_state = False
103
104 super(DefectsDB, self).__init__()
105
106 if create:
107 # Trigger creation of folders through properties
108 self.defects_folder
110
111 def __del__(self) -> None:
112 """
113 Ideally we would use inheritance to call destructors, but this isn't
114 possible in the general case with the way we (ab)use mixins, so we just
115 do it here.
116 """
117 self._clear_connections()
118
119 def create_defect(self, name: str, description: str) -> None:
120 """
121 Create a new type of defect; tries to figure out system ID from the
122 defect name. See also: `create_defect_with_id`, `new_system_defect`.
123
124 Parameters:
125 `name` : name of the defect
126 `description` : a human-readable description of the defect
127 Returns:
128 the ID number of the new defect
129 """
130 sysid = choose_new_defect_id(self.defect_id_map, name)
131 log.info("Creating new defect %s: system ID %08x", name, sysid)
132 self._create_defect_with_id(sysid, name, description)
133
134 def _create_defect_with_id(self, did: int, name: str, description: str) -> None:
135 """
136 Create a new type of defect, specifying the defect ID.
137 See also: `create_defect`, `new_system_defect`.
138
139 Parameters:
140 `did` : defect_id
141 `name` : name of the defect
142 `description` : a human-readable description of the defect
143 """
144 log.info("Creating new defect %s (0x%08x)", name, did)
145 if did in self.defect_ids: raise DefectExistsError(did)
146 try:
147 oldname = self.normalize_defect_names(name)
148 raise DefectExistsError(f'Defect {oldname} already exists')
149 except DefectUnknownError:
150 pass
151
152 self.defects_folder.createChannel(did,
153 name.encode('ascii'),
154 description.encode('utf-8'))
155 self._new_defect(did, name)
156
157 def retrieve(self, since: Optional[Union[int, Tuple[int,int], RunLumi]] = None,
158 until: Optional[Union[int, Tuple[int,int], RunLumi]] = None,
159 channels: Optional[Iterable[Union[str,int]]] = None, nonpresent: bool = False,
160 primary_only: bool = False, ignore: Optional[Collection[str]] = None,
161 with_primary_dependencies: bool = False, intersect: bool = False,
162 with_time: bool = False, evaluate_full: bool = True) -> IOVSet:
163 """
164 Retrieve defects from the database.
165
166 Parameters:
167 `since`, `until` : IoV range to query (Default: All)
168 `channels` : A list of channels to query. Can contain a mixture of
169 defect names and ids. (Default: None means all
170 channels, including all virtual)
171 `nonpresent` : Only return IoVs which are currently "present"
172 `primary_only` : Only return primary IoVs, no virtual ones.
173 `ignore` : Set of defects which won't be treated as bad.
174 `with_primary_dependencies` : When querying virtual flags, also get
175 primary flags which went into the calculation.
176 `intersect` : Intersect the result with the query range so that no
177 iov spans outside the query range
178 `with_time` : Also retrieves the time the defect was inserted
179 ~2x slower queries, doesn't work for virtual defects
180 `evaluate_full` : If specified, also compute the `comment` and
181 `recoverable` fields of virtual defects.
182 Causes a ~0.6x slowdown
183 """
184 if ignore is not None and not isinstance(ignore, set):
185 raise RuntimeError("ignore parameter should be set type")
186
187 desired_channels = None
188 # Figure out the IDs of channels to query and their virtuality
189 if channels is not None:
190 query_channels = set(self.defect_names_as_ids(channels))
191 virtual_channels = query_channels - self.defect_ids
192 primary_channels = query_channels & self.defect_ids
193 else:
194 # Empty channels list means query all channels
195 # (including all virtual)
196 if primary_only:
197 virtual_channels = None
198 else:
199 virtual_channels = self.virtual_defect_ids
200 primary_channels = set(self.defect_ids)
201 query_channels = None # (all)
202
203 primary_output_names = [self.defect_id_map[pid] for pid in primary_channels]
204 virtual_output_names = [] # (optionally populated below)
205
206 # Resolve virtual channels to query here, and the primary dependents.
207 if virtual_channels:
208 assert not primary_only, "Requested virtual channels with primary_only=True"
209 assert not with_time, "with_time flag only works for primary defects"
210 virtual_output_names = [self.virtual_defect_id_map[vid]
211 for vid in virtual_channels]
212
213 ordered_logics = self._resolve_evaluation_order(virtual_output_names)
214
215 if channels is not None:
216 # Since not all channels are being queried, it is necessary to
217 # add the desired primary channels to the query
218 primary_needed = self.resolve_primary_defects(ordered_logics)
219 primary_channels |= set(self.defect_names_as_ids(primary_needed))
220 query_channels = primary_channels
221
222 if with_primary_dependencies:
223 primary_output_names.extend(sorted(primary_needed))
224
225 for logic in ordered_logics:
226 logic.set_evaluation(evaluate_full)
227 else:
228 ordered_logics = []
229
230 # Figure out if the set of channels will produce too many ranges for COOL
231 if query_channels is not None:
232 query_channels = sorted(query_channels)
233 query_ranges = list_to_channelselection(query_channels, None, True)
234
235 if len(query_ranges) >= 50:
236 # We're querying too many ranges. Query everything, filter later.
237 desired_channels = set(primary_output_names + virtual_output_names)
238 query_channels = None # (everything)
239
240 # Retrieve primary IoVs
241 primary_iovs = fetch_iovs(self.defects_folder, since, until,
242 query_channels, self.defects_tag,
243 named_channels=True, unicode_strings=True,
244 with_time=with_time)
245
246 # Calculate virtual defects (if necessary)
247 if primary_only or not virtual_channels:
248 result = primary_iovs
249 else:
250 if not primary_iovs:
251 return IOVSet()
252 args = (primary_iovs, ordered_logics,
253 virtual_output_names, primary_output_names,
254 since, until, ignore)
255 result = calculate_virtual_defects(*args)
256
257 # Filter out results which have their present bit removed
258 # (unless otherwise specified)
259 if not nonpresent:
260 result = IOVSet(iov for iov in result if iov.present)
261
262 # Filter out channels which weren't queried by the user
263 # (to get around 50 channelselection COOL limit)
264 if desired_channels:
265 result = IOVSet(iov for iov in result
266 if iov.channel in desired_channels)
267
268 if intersect:
269 result = result.intersect_range((since, until))
270
271 return result
272
273 @property
274 def storage_buffer(self):
275 """
276 Gives a new context manager for use with the with statement, e.g.:
277 with ddb.storage_buffer:
278 for d in many_defects:
279 ddb.insert(...defect...)
280 """
281 assert not self._bad_state, "Please see comment in DefectsDB constructor"
282
283 @contextmanager
284 def thunk():
285 log.debug("setupStorageBuffer()")
286 self.defects_folder.setupStorageBuffer()
287 try:
288 yield
289 except Exception:
290 log.warning("Exception raised during DefectsDB.storage_buffer. "
291 "Not flushing storage buffer - but COOL has no way "
292 "to empty it. ")
293 self._bad_state = True
294 raise
295 else:
296 log.debug("flushStorageBuffer()")
297 self.defects_folder.flushStorageBuffer()
298 return thunk()
299
300 def insert(self, defect_id: Union[str, int], since: int, until: int, comment: str, added_by: str,
301 present: bool = True, recoverable: bool = False) -> None:
302 """
303 Insert a new defect into the database.
304
305 Parameters:
306 `defect_id` : The name or channel identifier for the deect
307 `since`, `until` : The COOL IoV for the range
308 `comment` : String255 arbitrary text comment
309 `added_by` : The user name or "sys:"-prefixed string of the
310 application that inserted the defect
311 `present` : The state of the flag (Default: True)
312 `recoverable` : Indicates whether there is any possibility to set
313 present=False in the future (Default: False)
314 """
315 return self._insert(defect_id, since, until, comment, added_by,
316 present, recoverable, self.defects_tag)
317
318 def _insert_iov(self, iov: IOVType, tag: str) -> None:
319 """
320 Helper function for inserting IOV objects, since record order doesn't
321 match function argument order
322 """
323 return self._insert(iov.channel, iov.since, iov.until, iov.comment,
324 iov.user, iov.present, iov.recoverable, tag)
325
326 def _insert(self, defect_id: Union[str, int], since: Union[int, Tuple[int, int], RunLumi],
327 until: Union[int, Tuple[int, int], RunLumi], comment: str, added_by: str,
328 present: bool = True, recoverable: bool = False, tag: str = 'HEAD') -> None:
329 """
330 Implementation of insert, allows tag specification for internal
331 functions
332 """
333 assert not self._read_only, "Insertion on read-only database"
334 assert not self._bad_state, "Please see comment in DefectsDB constructor"
335
336 # Force load of defects_folder to populate _defect_payload
337 store = self.defects_folder.storeObject
338 p = self._defect_payload
339
340 p["present"] = present
341 p["recoverable"] = recoverable
342 p["user"] = added_by.encode('utf-8')
343 p["comment"] = comment.encode('utf-8')
344
345 defect_id = self.defect_chan_as_id(defect_id, True)
346
347 store(since, until, p, defect_id, tag.encode('ascii'),
348 (True if tag != 'HEAD' else False))
349
350 def insert_multiple(self, defect_list: Iterable[IOVType],
351 tag: str = 'HEAD', use_flask: bool = False,
352 flask_cool_target: str = 'oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL',
353 flask_auth: Mapping[str, str] = {},
354 flask_db: str = 'CONDBR2',
355 flask_uri: str = 'https://cool-proxy-app.cern.ch/cool/multi_iovs'
356 ):
357 if not use_flask:
358 for defect in defect_list:
359 self._insert_iov(defect, tag)
360 else:
361 # Allow for override from environment
362 import os
363 flask_uri = os.environ.get('DQM_COOL_FLASK_URI', flask_uri)
364
365 log.debug(f'Flask server URI: {flask_uri}')
366 self._insert_multiple_flask(defect_list, tag, flask_cool_target,
367 flask_auth, flask_db, flask_uri)
368
369 def _insert_multiple_flask(self, defect_list: Iterable[IOVType],
370 tag: str,
371 flask_cool_target: str,
372 flask_auth: Mapping[str, str],
373 flask_db: str,
374 flask_uri: str):
375 import requests
376 import json
377 import urllib.parse
378 import os
379 from DQUtils.oracle import get_authentication
380 data = {'grant_type':'client_credentials',
381 'audience': 'cool-flask-server'}
382 data.update(flask_auth)
383 auth = requests.post('https://auth.cern.ch/auth/realms/cern/api-access/token',
384 data=data
385 )
386 if not auth:
387 raise RuntimeError('Cannot authenticate to Flask server')
388 else:
389 token = auth.json()['access_token']
390 log.debug(f'auth succeeded {token}')
391 username, password = get_authentication(flask_cool_target)
392 p = urllib.parse.urlparse(flask_cool_target)
393 if not p.hostname:
394 raise ValueError(f'Cannot interpret {flask_cool_target} as a path')
395 server, schema = p.hostname.upper(), p.path[1:]
396
397 submit_map = {'cmd': 'addIov',
398 'COOLDEST': flask_cool_target,
399 'ORA_SCHEMA': schema,
400 'ORA_INST': flask_db,
401 'COOLOPTS': 'None',
402 'COOLSINST': flask_db,
403 'ORA_SRV': server,
404 'COOLXP': password,
405 'ORA_USER': username,
406 'cool_data': {
407 'folder': DEFECTS_FOLDER,
408 'description': '',
409 'tag': tag,
410 'record': {'present': 'Bool', 'recoverable': 'Bool',
411 'user': 'String255', 'comment': 'String255'},
412 'size': 0,
413 'iovs': []
414 }}
415
416 for defect in defect_list:
417 submit_map['cool_data']['iovs'].append(
418 {'channel': self.defect_chan_as_id(defect.channel, True),
419 'since': defect.since,
420 'until': defect.until,
421 'payload': {'present': defect.present,
422 'recoverable': defect.recoverable,
423 'user': defect.user,
424 'comment': defect.comment
425 }
426 }
427 )
428 submit_map['cool_data']['size'] += 1
429 r = requests.post(flask_uri,
430 headers={'Authorization': f'Bearer {token}'},
431 files={'file': ('iov.json',
432 json.dumps({'cool_multi_iov_request': submit_map}))},
433 verify=('DQM_COOL_FLASK_NOVERIFY' not in os.environ)
434 )
435 log.debug(r.content)
436 if not r or r.json()['code'] != 0:
437 raise RuntimeError(f'Unable to upload defects. Flask server returned error:\n{r.json()["message"]}')
None _insert(self, Union[str, int] defect_id, Union[int, Tuple[int, int], RunLumi] since, Union[int, Tuple[int, int], RunLumi] until, str comment, str added_by, bool present=True, bool recoverable=False, str tag='HEAD')
None create_defect(self, str name, str description)
_insert_multiple_flask(self, Iterable[IOVType] defect_list, str tag, str flask_cool_target, Mapping[str, str] flask_auth, str flask_db, str flask_uri)
None __init__(self, str connection_string=DEFAULT_CONNECTION_STRING, bool read_only=True, bool create=False, Union[str, Tuple] tag="HEAD")
insert_multiple(self, Iterable[IOVType] defect_list, str tag='HEAD', bool use_flask=False, str flask_cool_target='oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL', Mapping[str, str] flask_auth={}, str flask_db='CONDBR2', str flask_uri='https://cool-proxy-app.cern.ch/cool/multi_iovs')
None _insert_iov(self, IOVType iov, str tag)
None insert(self, Union[str, int] defect_id, int since, int until, str comment, str added_by, bool present=True, bool recoverable=False)
IOVSet retrieve(self, Optional[Union[int, Tuple[int, int], RunLumi]] since=None, Optional[Union[int, Tuple[int, int], RunLumi]] until=None, Optional[Iterable[Union[str, int]]] channels=None, bool nonpresent=False, bool primary_only=False, Optional[Collection[str]] ignore=None, bool with_primary_dependencies=False, bool intersect=False, bool with_time=False, bool evaluate_full=True)
None _create_defect_with_id(self, int did, str name, str description)
cool.IFolder defect_logic_folder(self)
Definition folders.py:100
cool.IFolder defects_folder(self)
Definition folders.py:67
None _new_defect(self, int did, str dname)
Definition ids.py:120
MutableMapping[Union[str, int], Union[str, int]] defect_id_map(self)
Definition ids.py:162
MutableMapping[Union[str, int], Union[str, int]] virtual_defect_id_map(self)
Definition ids.py:193
Union[str, List[str]] normalize_defect_names(self, Union[str, Iterable[str]] defect_id)
Definition ids.py:307
int defect_chan_as_id(self, Union[str, int] channel, bool primary_only=False)
Definition ids.py:202
List[int] defect_names_as_ids(self, Iterable[Union[str, int]] channels)
Definition ids.py:227
Set[str] resolve_primary_defects(self, Iterable[DefectLogic] defect_logics)
List[DefectLogic] _resolve_evaluation_order(self, Optional[Iterable[str]] defects=None)
STL class.
fetch_iovs(folder_name, since=None, until=None, channels=None, tag="", what="all", max_records=-1, with_channel=True, loud=False, database=None, convert_time=False, named_channels=False, selection=None, runs=None, with_time=False, unicode_strings=False)