4 Authors: Peter Waller <peter.waller@cern.ch> and "Peter Onyisi" <peter.onyisi@cern.ch>
6 Please see https://twiki.cern.ch/twiki/bin/viewauth/Atlas/DQDefects
8 This file defines DefectsDB and some of its public interface.
10 To separate out the code into digestable units, this class is split into mixins
11 which can be found defined in other files in this directory.
13 The bulk of the core functionality is found in this file.
17 from logging
import getLogger; log =
getLogger(
"DQDefects.db")
19 from contextlib
import contextmanager
21 from DQUtils
import fetch_iovs, IOVSet
22 from DQUtils.sugar.iovtype
import IOVType
23 from DQUtils.channel_mapping
import list_to_channelselection
24 from DQUtils.sugar
import RunLumi
26 from .
import DEFAULT_CONNECTION_STRING, DEFECTS_FOLDER
28 from .exceptions
import DefectExistsError, DefectUnknownError
29 from .folders
import DefectsDBFoldersMixin
30 from .ids
import DefectsDBIDsNamesMixin, choose_new_defect_id
31 from .tags
import DefectsDBTagsMixin, tagtype
32 from .virtual_mixin
import DefectsDBVirtualDefectsMixin
33 from .virtual_calculator
import calculate_virtual_defects
34 from typing
import Union, Tuple, Optional, Iterable, Collection, Mapping
38 DefectsDBFoldersMixin,
39 DefectsDBIDsNamesMixin,
42 Implementation is split into mixins.
44 DefectsDBIDsNamesMixin:
45 Defines functions for dealing with defect IDs
48 Logic for loading or creating the COOL folder/database
50 DefectsDBFoldersMixin:
51 Functions for managing tags
53 DefectsDBVirtualDefectsMixin:
54 Internal code for managing and computing virutal defects
56 Public interface is nominally defined in this class (DefectsDB).
59 def __init__(self, connection_string: str = DEFAULT_CONNECTION_STRING,
60 read_only: bool =
True, create: bool =
False, tag: Union[str, Tuple] =
"HEAD") ->
None:
62 Create a new DefectsDB instance.
64 The COOL folder resides at `DEFECTS_FOLDER`, which is a module-level
65 varaiable in this module.
68 `connection_string` : A COOL connection string, or a string ending
69 in .db. In the latter case, the database is
70 assumed to be a sqlite database at that path.
71 `read_only` : Indicates whether insertions will be disallowed on
72 this connection. (Default: True)
73 `create` : If True, attempt to create the database if it doesn't
74 exist. (Default: False)
75 `tag` : The COOL tag(s) to use. Either a single string used
76 for both defects and virtual defect logic, or a
77 2-tuple (defecttag, logictag). The default is to
78 use HEAD for both defects and logic. Either
79 folder-level or hierarchical tags can be given.
84 if isinstance(tag, str):
88 tag = tagtype._make(tag)
90 raise TypeError(
'tag argument must be a 2-element sequence')
92 raise TypeError(
'tag argument must be a 2-element sequence')
113 Ideally we would use inheritance to call destructors, but this isn't
114 possible in the general case with the way we (ab)use mixins, so we just
121 Create a new type of defect; tries to figure out system ID from the
122 defect name. See also: `create_defect_with_id`, `new_system_defect`.
125 `name` : name of the defect
126 `description` : a human-readable description of the defect
128 the ID number of the new defect
131 log.info(
"Creating new defect %s: system ID %08x", name, sysid)
136 Create a new type of defect, specifying the defect ID.
137 See also: `create_defect`, `new_system_defect`.
141 `name` : name of the defect
142 `description` : a human-readable description of the defect
144 log.info(
"Creating new defect %s (0x%08x)", name, did)
149 except DefectUnknownError:
153 name.encode(
'ascii'),
154 description.encode(
'utf-8'))
157 def retrieve(self, since: Optional[Union[int, Tuple[int,int], RunLumi]] =
None,
158 until: Optional[Union[int, Tuple[int,int], RunLumi]] =
None,
159 channels: Optional[Iterable[Union[str,int]]] =
None, nonpresent: bool =
False,
160 primary_only: bool =
False, ignore: Optional[Collection[str]] =
None,
161 with_primary_dependencies: bool =
False, intersect: bool =
False,
162 with_time: bool =
False, evaluate_full: bool =
True) -> IOVSet:
164 Retrieve defects from the database.
167 `since`, `until` : IoV range to query (Default: All)
168 `channels` : A list of channels to query. Can contain a mixture of
169 defect names and ids. (Default: None means all
170 channels, including all virtual)
171 `nonpresent` : Only return IoVs which are currently "present"
172 `primary_only` : Only return primary IoVs, no virtual ones.
173 `ignore` : Set of defects which won't be treated as bad.
174 `with_primary_dependencies` : When querying virtual flags, also get
175 primary flags which went into the calculation.
176 `intersect` : Intersect the result with the query range so that no
177 iov spans outside the query range
178 `with_time` : Also retrieves the time the defect was inserted
179 ~2x slower queries, doesn't work for virtual defects
180 `evaluate_full` : If specified, also compute the `comment` and
181 `recoverable` fields of virtual defects.
182 Causes a ~0.6x slowdown
184 if ignore
is not None and not isinstance(ignore, set):
185 raise RuntimeError(
"ignore parameter should be set type")
187 desired_channels =
None
189 if channels
is not None:
191 virtual_channels = query_channels - self.
defect_ids
192 primary_channels = query_channels & self.
defect_ids
197 virtual_channels =
None
201 query_channels =
None
203 primary_output_names = [self.
defect_id_map[pid]
for pid
in primary_channels]
204 virtual_output_names = []
208 assert not primary_only,
"Requested virtual channels with primary_only=True"
209 assert not with_time,
"with_time flag only works for primary defects"
211 for vid
in virtual_channels]
215 if channels
is not None:
220 query_channels = primary_channels
222 if with_primary_dependencies:
223 primary_output_names.extend(
sorted(primary_needed))
225 for logic
in ordered_logics:
226 logic.set_evaluation(evaluate_full)
231 if query_channels
is not None:
232 query_channels =
sorted(query_channels)
235 if len(query_ranges) >= 50:
237 desired_channels =
set(primary_output_names + virtual_output_names)
238 query_channels =
None
243 named_channels=
True, unicode_strings=
True,
247 if primary_only
or not virtual_channels:
248 result = primary_iovs
252 args = (primary_iovs, ordered_logics,
253 virtual_output_names, primary_output_names,
254 since, until, ignore)
260 result = IOVSet(iov
for iov
in result
if iov.present)
265 result = IOVSet(iov
for iov
in result
266 if iov.channel
in desired_channels)
269 result = result.intersect_range((since, until))
276 Gives a new context manager for use with the with statement, e.g.:
277 with ddb.storage_buffer:
278 for d in many_defects:
279 ddb.insert(...defect...)
281 assert not self.
_bad_state,
"Please see comment in DefectsDB constructor"
285 log.debug(
"setupStorageBuffer()")
290 log.warning(
"Exception raised during DefectsDB.storage_buffer. "
291 "Not flushing storage buffer - but COOL has no way "
296 log.debug(
"flushStorageBuffer()")
300 def insert(self, defect_id: Union[str, int], since: int, until: int, comment: str, added_by: str,
301 present: bool =
True, recoverable: bool =
False) ->
None:
303 Insert a new defect into the database.
306 `defect_id` : The name or channel identifier for the deect
307 `since`, `until` : The COOL IoV for the range
308 `comment` : String255 arbitrary text comment
309 `added_by` : The user name or "sys:"-prefixed string of the
310 application that inserted the defect
311 `present` : The state of the flag (Default: True)
312 `recoverable` : Indicates whether there is any possibility to set
313 present=False in the future (Default: False)
315 return self.
_insert(defect_id, since, until, comment, added_by,
320 Helper function for inserting IOV objects, since record order doesn't
321 match function argument order
323 return self.
_insert(iov.channel, iov.since, iov.until, iov.comment,
324 iov.user, iov.present, iov.recoverable, tag)
326 def _insert(self, defect_id: Union[str, int], since: Union[int, Tuple[int, int], RunLumi],
327 until: Union[int, Tuple[int, int], RunLumi], comment: str, added_by: str,
328 present: bool =
True, recoverable: bool =
False, tag: str =
'HEAD') ->
None:
330 Implementation of insert, allows tag specification for internal
333 assert not self.
_read_only,
"Insertion on read-only database"
334 assert not self.
_bad_state,
"Please see comment in DefectsDB constructor"
340 p[
"present"] = present
341 p[
"recoverable"] = recoverable
342 p[
"user"] = added_by.encode(
'utf-8')
343 p[
"comment"] = comment.encode(
'utf-8')
347 store(since, until, p, defect_id, tag.encode(
'ascii'),
348 (
True if tag !=
'HEAD' else False))
351 tag: str =
'HEAD', use_flask: bool =
False,
352 flask_cool_target: str =
'oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL',
353 flask_auth: Mapping[str, str] = {},
354 flask_db: str =
'CONDBR2',
355 flask_uri: str =
'https://cool-proxy-app.cern.ch/cool/multi_iovs'
358 for defect
in defect_list:
363 flask_uri = os.environ.get(
'DQM_COOL_FLASK_URI', flask_uri)
365 log.debug(f
'Flask server URI: {flask_uri}')
367 flask_auth, flask_db, flask_uri)
371 flask_cool_target: str,
372 flask_auth: Mapping[str, str],
379 from DQUtils.oracle
import get_authentication
380 data = {
'grant_type':
'client_credentials',
381 'audience':
'cool-flask-server'}
382 data.update(flask_auth)
383 auth = requests.post(
'https://auth.cern.ch/auth/realms/cern/api-access/token',
387 raise RuntimeError(
'Cannot authenticate to Flask server')
389 token = auth.json()[
'access_token']
390 log.debug(f
'auth succeeded {token}')
392 p = urllib.parse.urlparse(flask_cool_target)
394 raise ValueError(f
'Cannot interpret {flask_cool_target} as a path')
395 server, schema = p.hostname.upper(), p.path[1:]
397 submit_map = {
'cmd':
'addIov',
398 'COOLDEST': flask_cool_target,
399 'ORA_SCHEMA': schema,
400 'ORA_INST': flask_db,
402 'COOLSINST': flask_db,
405 'ORA_USER': username,
407 'folder': DEFECTS_FOLDER,
410 'record': {
'present':
'Bool',
'recoverable':
'Bool',
411 'user':
'String255',
'comment':
'String255'},
416 for defect
in defect_list:
417 submit_map[
'cool_data'][
'iovs'].
append(
419 'since': defect.since,
420 'until': defect.until,
421 'payload': {
'present': defect.present,
422 'recoverable': defect.recoverable,
424 'comment': defect.comment
428 submit_map[
'cool_data'][
'size'] += 1
429 r = requests.post(flask_uri,
430 headers={
'Authorization': f
'Bearer {token}'},
431 files={
'file': (
'iov.json',
432 json.dumps({
'cool_multi_iov_request': submit_map}))},
433 verify=(
'DQM_COOL_FLASK_NOVERIFY' not in os.environ)
436 if not r
or r.json()[
'code'] != 0:
437 raise RuntimeError(f
'Unable to upload defects. Flask server returned error:\n{r.json()["message"]}')