4 Authors: Peter Waller <peter.waller@cern.ch> and "Peter Onyisi" <peter.onyisi@cern.ch>
6 Please see https://twiki.cern.ch/twiki/bin/viewauth/Atlas/DQDefects
8 This file defines DefectsDB and some of its public interface.
10 To separate out the code into digestable units, this class is split into mixins
11 which can be found defined in other files in this directory.
13 The bulk of the core functionality is found in this file.
17 from logging
import getLogger; log =
getLogger(
"DQDefects.db")
19 from contextlib
import contextmanager
21 from DQUtils
import fetch_iovs, IOVSet
22 from DQUtils.sugar.iovtype
import IOVType
23 from DQUtils.channel_mapping
import list_to_channelselection
24 from DQUtils.sugar
import RunLumi
26 from .
import DEFAULT_CONNECTION_STRING, DEFECTS_FOLDER
28 from .exceptions
import DefectExistsError, DefectUnknownError
29 from .folders
import DefectsDBFoldersMixin
30 from .ids
import DefectsDBIDsNamesMixin, choose_new_defect_id
31 from .tags
import DefectsDBTagsMixin, tagtype
32 from .virtual_mixin
import DefectsDBVirtualDefectsMixin
33 from .virtual_calculator
import calculate_virtual_defects
34 from typing
import Union, Tuple, Optional, Iterable, Collection, Mapping
40 DefectsDBFoldersMixin,
41 DefectsDBIDsNamesMixin,
44 Implementation is split into mixins.
46 DefectsDBIDsNamesMixin:
47 Defines functions for dealing with defect IDs
50 Logic for loading or creating the COOL folder/database
52 DefectsDBFoldersMixin:
53 Functions for managing tags
55 DefectsDBVirtualDefectsMixin:
56 Internal code for managing and computing virutal defects
58 Public interface is nominally defined in this class (DefectsDB).
61 def __init__(self, connection_string: str = DEFAULT_CONNECTION_STRING,
62 read_only: bool =
True, create: bool =
False, tag: Union[str, Tuple] =
"HEAD") ->
None:
64 Create a new DefectsDB instance.
66 The COOL folder resides at `DEFECTS_FOLDER`, which is a module-level
67 varaiable in this module.
70 `connection_string` : A COOL connection string, or a string ending
71 in .db. In the latter case, the database is
72 assumed to be a sqlite database at that path.
73 `read_only` : Indicates whether insertions will be disallowed on
74 this connection. (Default: True)
75 `create` : If True, attempt to create the database if it doesn't
76 exist. (Default: False)
77 `tag` : The COOL tag(s) to use. Either a single string used
78 for both defects and virtual defect logic, or a
79 2-tuple (defecttag, logictag). The default is to
80 use HEAD for both defects and logic. Either
81 folder-level or hierarchical tags can be given.
86 if isinstance(tag, six.string_types):
90 tag = tagtype._make(tag)
92 raise TypeError(
'tag argument must be a 2-element sequence')
94 raise TypeError(
'tag argument must be a 2-element sequence')
115 Ideally we would use inheritance to call destructors, but this isn't
116 possible in the general case with the way we (ab)use mixins, so we just
123 Create a new type of defect; tries to figure out system ID from the
124 defect name. See also: `create_defect_with_id`, `new_system_defect`.
127 `name` : name of the defect
128 `description` : a human-readable description of the defect
130 the ID number of the new defect
133 log.info(
"Creating new defect %s: system ID %08x", name, sysid)
138 Create a new type of defect, specifying the defect ID.
139 See also: `create_defect`, `new_system_defect`.
143 `name` : name of the defect
144 `description` : a human-readable description of the defect
146 log.info(
"Creating new defect %s (0x%08x)", name, did)
151 except DefectUnknownError:
155 name.encode(
'ascii'),
156 description.encode(
'utf-8'))
159 def retrieve(self, since: Optional[Union[int, Tuple[int,int], RunLumi]] =
None,
160 until: Optional[Union[int, Tuple[int,int], RunLumi]] =
None,
161 channels: Optional[Iterable[Union[str,int]]] =
None, nonpresent: bool =
False,
162 primary_only: bool =
False, ignore: Optional[Collection[str]] =
None,
163 with_primary_dependencies: bool =
False, intersect: bool =
False,
164 with_time: bool =
False, evaluate_full: bool =
True) -> IOVSet:
166 Retrieve defects from the database.
169 `since`, `until` : IoV range to query (Default: All)
170 `channels` : A list of channels to query. Can contain a mixture of
171 defect names and ids. (Default: None means all
172 channels, including all virtual)
173 `nonpresent` : Only return IoVs which are currently "present"
174 `primary_only` : Only return primary IoVs, no virtual ones.
175 `ignore` : Set of defects which won't be treated as bad.
176 `with_primary_dependencies` : When querying virtual flags, also get
177 primary flags which went into the calculation.
178 `intersect` : Intersect the result with the query range so that no
179 iov spans outside the query range
180 `with_time` : Also retrieves the time the defect was inserted
181 ~2x slower queries, doesn't work for virtual defects
182 `evaluate_full` : If specified, also compute the `comment` and
183 `recoverable` fields of virtual defects.
184 Causes a ~0.6x slowdown
186 if ignore
is not None and not isinstance(ignore, set):
187 raise RuntimeError(
"ignore parameter should be set type")
189 desired_channels =
None
191 if channels
is not None:
193 virtual_channels = query_channels - self.
defect_ids
194 primary_channels = query_channels & self.
defect_ids
199 virtual_channels =
None
203 query_channels =
None
205 primary_output_names = [self.
defect_id_map[pid]
for pid
in primary_channels]
206 virtual_output_names = []
210 assert not primary_only,
"Requested virtual channels with primary_only=True"
211 assert not with_time,
"with_time flag only works for primary defects"
213 for vid
in virtual_channels]
217 if channels
is not None:
222 query_channels = primary_channels
224 if with_primary_dependencies:
225 primary_output_names.extend(
sorted(primary_needed))
227 for logic
in ordered_logics:
228 logic.set_evaluation(evaluate_full)
233 if query_channels
is not None:
234 query_channels =
sorted(query_channels)
237 if len(query_ranges) >= 50:
239 desired_channels =
set(primary_output_names + virtual_output_names)
240 query_channels =
None
245 named_channels=
True, unicode_strings=
True,
249 if primary_only
or not virtual_channels:
250 result = primary_iovs
254 args = (primary_iovs, ordered_logics,
255 virtual_output_names, primary_output_names,
256 since, until, ignore)
262 result = IOVSet(iov
for iov
in result
if iov.present)
267 result = IOVSet(iov
for iov
in result
268 if iov.channel
in desired_channels)
271 result = result.intersect_range((since, until))
278 Gives a new context manager for use with the with statement, e.g.:
279 with ddb.storage_buffer:
280 for d in many_defects:
281 ddb.insert(...defect...)
283 assert not self.
_bad_state,
"Please see comment in DefectsDB constructor"
287 log.debug(
"setupStorageBuffer()")
292 log.warning(
"Exception raised during DefectsDB.storage_buffer. "
293 "Not flushing storage buffer - but COOL has no way "
298 log.debug(
"flushStorageBuffer()")
302 def insert(self, defect_id: Union[str, int], since: int, until: int, comment: str, added_by: str,
303 present: bool =
True, recoverable: bool =
False) ->
None:
305 Insert a new defect into the database.
308 `defect_id` : The name or channel identifier for the deect
309 `since`, `until` : The COOL IoV for the range
310 `comment` : String255 arbitrary text comment
311 `added_by` : The user name or "sys:"-prefixed string of the
312 application that inserted the defect
313 `present` : The state of the flag (Default: True)
314 `recoverable` : Indicates whether there is any possibility to set
315 present=False in the future (Default: False)
317 return self.
_insert(defect_id, since, until, comment, added_by,
322 Helper function for inserting IOV objects, since record order doesn't
323 match function argument order
325 return self.
_insert(iov.channel, iov.since, iov.until, iov.comment,
326 iov.user, iov.present, iov.recoverable, tag)
328 def _insert(self, defect_id: Union[str, int], since: Union[int, Tuple[int, int], RunLumi],
329 until: Union[int, Tuple[int, int], RunLumi], comment: str, added_by: str,
330 present: bool =
True, recoverable: bool =
False, tag: str =
'HEAD') ->
None:
332 Implementation of insert, allows tag specification for internal
335 assert not self.
_read_only,
"Insertion on read-only database"
336 assert not self.
_bad_state,
"Please see comment in DefectsDB constructor"
342 p[
"present"] = present
343 p[
"recoverable"] = recoverable
344 p[
"user"] = added_by.encode(
'utf-8')
345 p[
"comment"] = comment.encode(
'utf-8')
349 store(since, until, p, defect_id, tag.encode(
'ascii'),
350 (
True if tag !=
'HEAD' else False))
353 tag: str =
'HEAD', use_flask: bool =
False,
354 flask_cool_target: str =
'oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL',
355 flask_auth: Mapping[str, str] = {},
356 flask_db: str =
'CONDBR2',
357 flask_uri: str =
'https://cool-proxy-app.cern.ch/cool/multi_iovs'
360 for defect
in defect_list:
365 flask_uri = os.environ.get(
'DQM_COOL_FLASK_URI', flask_uri)
367 log.debug(f
'Flask server URI: {flask_uri}')
369 flask_auth, flask_db, flask_uri)
373 flask_cool_target: str,
374 flask_auth: Mapping[str, str],
381 from DQUtils.oracle
import get_authentication
382 data = {
'grant_type':
'client_credentials',
383 'audience':
'cool-flask-server'}
384 data.update(flask_auth)
385 auth = requests.post(
'https://auth.cern.ch/auth/realms/cern/api-access/token',
389 raise RuntimeError(
'Cannot authenticate to Flask server')
391 token = auth.json()[
'access_token']
392 log.debug(f
'auth succeeded {token}')
394 p = urllib.parse.urlparse(flask_cool_target)
396 raise ValueError(f
'Cannot interpret {flask_cool_target} as a path')
397 server, schema = p.hostname.upper(), p.path[1:]
399 submit_map = {
'cmd':
'addIov',
400 'COOLDEST': flask_cool_target,
401 'ORA_SCHEMA': schema,
402 'ORA_INST': flask_db,
404 'COOLSINST': flask_db,
407 'ORA_USER': username,
409 'folder': DEFECTS_FOLDER,
412 'record': {
'present':
'Bool',
'recoverable':
'Bool',
413 'user':
'String255',
'comment':
'String255'},
418 for defect
in defect_list:
419 submit_map[
'cool_data'][
'iovs'].
append(
421 'since': defect.since,
422 'until': defect.until,
423 'payload': {
'present': defect.present,
424 'recoverable': defect.recoverable,
426 'comment': defect.comment
430 submit_map[
'cool_data'][
'size'] += 1
431 r = requests.post(flask_uri,
432 headers={
'Authorization': f
'Bearer {token}'},
433 files={
'file': (
'iov.json',
434 json.dumps({
'cool_multi_iov_request': submit_map}))},
435 verify=(
'DQM_COOL_FLASK_NOVERIFY' not in os.environ)
438 if not r
or r.json()[
'code'] != 0:
439 raise RuntimeError(f
'Unable to upload defects. Flask server returned error:\n{r.json()["message"]}')