ATLAS Offline Software
DQDefects/python/db.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 """
4 Authors: Peter Waller <peter.waller@cern.ch> and "Peter Onyisi" <peter.onyisi@cern.ch>
5 
6 Please see https://twiki.cern.ch/twiki/bin/viewauth/Atlas/DQDefects
7 
8 This file defines DefectsDB and some of its public interface.
9 
10 To separate out the code into digestable units, this class is split into mixins
11 which can be found defined in other files in this directory.
12 
13 The bulk of the core functionality is found in this file.
14 """
15 
16 
17 from logging import getLogger; log = getLogger("DQDefects.db")
18 
19 from contextlib import contextmanager
20 
21 from DQUtils import fetch_iovs, IOVSet
22 from DQUtils.sugar.iovtype import IOVType
23 from DQUtils.channel_mapping import list_to_channelselection
24 from DQUtils.sugar import RunLumi
25 
26 from . import DEFAULT_CONNECTION_STRING, DEFECTS_FOLDER
27 
28 from .exceptions import DefectExistsError, DefectUnknownError
29 from .folders import DefectsDBFoldersMixin
30 from .ids import DefectsDBIDsNamesMixin, choose_new_defect_id
31 from .tags import DefectsDBTagsMixin, tagtype
32 from .virtual_mixin import DefectsDBVirtualDefectsMixin
33 from .virtual_calculator import calculate_virtual_defects
34 from typing import Union, Tuple, Optional, Iterable, Collection, Mapping
35 
37  DefectsDBTagsMixin,
38  DefectsDBFoldersMixin,
39  DefectsDBIDsNamesMixin,
40  object):
41  """
42  Implementation is split into mixins.
43 
44  DefectsDBIDsNamesMixin:
45  Defines functions for dealing with defect IDs
46 
47  DefectsDBFolderMixin:
48  Logic for loading or creating the COOL folder/database
49 
50  DefectsDBFoldersMixin:
51  Functions for managing tags
52 
53  DefectsDBVirtualDefectsMixin:
54  Internal code for managing and computing virutal defects
55 
56  Public interface is nominally defined in this class (DefectsDB).
57  """
58 
59  def __init__(self, connection_string: str = DEFAULT_CONNECTION_STRING,
60  read_only: bool = True, create: bool = False, tag: Union[str, Tuple] = "HEAD") -> None:
61  """
62  Create a new DefectsDB instance.
63 
64  The COOL folder resides at `DEFECTS_FOLDER`, which is a module-level
65  varaiable in this module.
66 
67  Parameters:
68  `connection_string` : A COOL connection string, or a string ending
69  in .db. In the latter case, the database is
70  assumed to be a sqlite database at that path.
71  `read_only` : Indicates whether insertions will be disallowed on
72  this connection. (Default: True)
73  `create` : If True, attempt to create the database if it doesn't
74  exist. (Default: False)
75  `tag` : The COOL tag(s) to use. Either a single string used
76  for both defects and virtual defect logic, or a
77  2-tuple (defecttag, logictag). The default is to
78  use HEAD for both defects and logic. Either
79  folder-level or hierarchical tags can be given.
80  """
81  self.connection_string = connection_string
82  self._read_only = read_only
83  self._create = create
84  if isinstance(tag, str):
85  self._tag = tagtype(tag, tag) if tag else tagtype("HEAD", "HEAD")
86  else:
87  try:
88  tag = tagtype._make(tag)
89  except Exception:
90  raise TypeError('tag argument must be a 2-element sequence')
91  if len(tag) != 2:
92  raise TypeError('tag argument must be a 2-element sequence')
93  self._tag = tag
94  self._tag = tagtype(self._tag[0],
95  self._tag[1])
96 
97  # COOL has no way of emptying a storage buffer. Creating a new storage
98  # buffer flushes the old one. Therefore, if an exception happens
99  # halfway through setting up a storage buffer, subsequent operations
100  # risk flushing a half-completed operation. In that case, we refuse to
101  # do any more write operations with that database connection.
102  self._bad_state = False
103 
104  super(DefectsDB, self).__init__()
105 
106  if create:
107  # Trigger creation of folders through properties
108  self.defects_folder
110 
111  def __del__(self) -> None:
112  """
113  Ideally we would use inheritance to call destructors, but this isn't
114  possible in the general case with the way we (ab)use mixins, so we just
115  do it here.
116  """
117  self._clear_connections()
118 
119  def create_defect(self, name: str, description: str) -> None:
120  """
121  Create a new type of defect; tries to figure out system ID from the
122  defect name. See also: `create_defect_with_id`, `new_system_defect`.
123 
124  Parameters:
125  `name` : name of the defect
126  `description` : a human-readable description of the defect
127  Returns:
128  the ID number of the new defect
129  """
130  sysid = choose_new_defect_id(self.defect_id_map, name)
131  log.info("Creating new defect %s: system ID %08x", name, sysid)
132  self._create_defect_with_id(sysid, name, description)
133 
134  def _create_defect_with_id(self, did: int, name: str, description: str) -> None:
135  """
136  Create a new type of defect, specifying the defect ID.
137  See also: `create_defect`, `new_system_defect`.
138 
139  Parameters:
140  `did` : defect_id
141  `name` : name of the defect
142  `description` : a human-readable description of the defect
143  """
144  log.info("Creating new defect %s (0x%08x)", name, did)
145  if did in self.defect_ids: raise DefectExistsError(did)
146  try:
147  oldname = self.normalize_defect_names(name)
148  raise DefectExistsError(f'Defect {oldname} already exists')
149  except DefectUnknownError:
150  pass
151 
152  self.defects_folder.createChannel(did,
153  name.encode('ascii'),
154  description.encode('utf-8'))
155  self._new_defect(did, name)
156 
157  def retrieve(self, since: Optional[Union[int, Tuple[int,int], RunLumi]] = None,
158  until: Optional[Union[int, Tuple[int,int], RunLumi]] = None,
159  channels: Optional[Iterable[Union[str,int]]] = None, nonpresent: bool = False,
160  primary_only: bool = False, ignore: Optional[Collection[str]] = None,
161  with_primary_dependencies: bool = False, intersect: bool = False,
162  with_time: bool = False, evaluate_full: bool = True) -> IOVSet:
163  """
164  Retrieve defects from the database.
165 
166  Parameters:
167  `since`, `until` : IoV range to query (Default: All)
168  `channels` : A list of channels to query. Can contain a mixture of
169  defect names and ids. (Default: None means all
170  channels, including all virtual)
171  `nonpresent` : Only return IoVs which are currently "present"
172  `primary_only` : Only return primary IoVs, no virtual ones.
173  `ignore` : Set of defects which won't be treated as bad.
174  `with_primary_dependencies` : When querying virtual flags, also get
175  primary flags which went into the calculation.
176  `intersect` : Intersect the result with the query range so that no
177  iov spans outside the query range
178  `with_time` : Also retrieves the time the defect was inserted
179  ~2x slower queries, doesn't work for virtual defects
180  `evaluate_full` : If specified, also compute the `comment` and
181  `recoverable` fields of virtual defects.
182  Causes a ~0.6x slowdown
183  """
184  if ignore is not None and not isinstance(ignore, set):
185  raise RuntimeError("ignore parameter should be set type")
186 
187  desired_channels = None
188  # Figure out the IDs of channels to query and their virtuality
189  if channels is not None:
190  query_channels = set(self.defect_names_as_ids(channels))
191  virtual_channels = query_channels - self.defect_ids
192  primary_channels = query_channels & self.defect_ids
193  else:
194  # Empty channels list means query all channels
195  # (including all virtual)
196  if primary_only:
197  virtual_channels = None
198  else:
199  virtual_channels = self.virtual_defect_ids
200  primary_channels = set(self.defect_ids)
201  query_channels = None # (all)
202 
203  primary_output_names = [self.defect_id_map[pid] for pid in primary_channels]
204  virtual_output_names = [] # (optionally populated below)
205 
206  # Resolve virtual channels to query here, and the primary dependents.
207  if virtual_channels:
208  assert not primary_only, "Requested virtual channels with primary_only=True"
209  assert not with_time, "with_time flag only works for primary defects"
210  virtual_output_names = [self.virtual_defect_id_map[vid]
211  for vid in virtual_channels]
212 
213  ordered_logics = self._resolve_evaluation_order(virtual_output_names)
214 
215  if channels is not None:
216  # Since not all channels are being queried, it is necessary to
217  # add the desired primary channels to the query
218  primary_needed = self.resolve_primary_defects(ordered_logics)
219  primary_channels |= set(self.defect_names_as_ids(primary_needed))
220  query_channels = primary_channels
221 
222  if with_primary_dependencies:
223  primary_output_names.extend(sorted(primary_needed))
224 
225  for logic in ordered_logics:
226  logic.set_evaluation(evaluate_full)
227  else:
228  ordered_logics = []
229 
230  # Figure out if the set of channels will produce too many ranges for COOL
231  if query_channels is not None:
232  query_channels = sorted(query_channels)
233  query_ranges = list_to_channelselection(query_channels, None, True)
234 
235  if len(query_ranges) >= 50:
236  # We're querying too many ranges. Query everything, filter later.
237  desired_channels = set(primary_output_names + virtual_output_names)
238  query_channels = None # (everything)
239 
240  # Retrieve primary IoVs
241  primary_iovs = fetch_iovs(self.defects_folder, since, until,
242  query_channels, self.defects_tag,
243  named_channels=True, unicode_strings=True,
244  with_time=with_time)
245 
246  # Calculate virtual defects (if necessary)
247  if primary_only or not virtual_channels:
248  result = primary_iovs
249  else:
250  if not primary_iovs:
251  return IOVSet()
252  args = (primary_iovs, ordered_logics,
253  virtual_output_names, primary_output_names,
254  since, until, ignore)
255  result = calculate_virtual_defects(*args)
256 
257  # Filter out results which have their present bit removed
258  # (unless otherwise specified)
259  if not nonpresent:
260  result = IOVSet(iov for iov in result if iov.present)
261 
262  # Filter out channels which weren't queried by the user
263  # (to get around 50 channelselection COOL limit)
264  if desired_channels:
265  result = IOVSet(iov for iov in result
266  if iov.channel in desired_channels)
267 
268  if intersect:
269  result = result.intersect_range((since, until))
270 
271  return result
272 
273  @property
274  def storage_buffer(self):
275  """
276  Gives a new context manager for use with the with statement, e.g.:
277  with ddb.storage_buffer:
278  for d in many_defects:
279  ddb.insert(...defect...)
280  """
281  assert not self._bad_state, "Please see comment in DefectsDB constructor"
282 
283  @contextmanager
284  def thunk():
285  log.debug("setupStorageBuffer()")
286  self.defects_folder.setupStorageBuffer()
287  try:
288  yield
289  except Exception:
290  log.warning("Exception raised during DefectsDB.storage_buffer. "
291  "Not flushing storage buffer - but COOL has no way "
292  "to empty it. ")
293  self._bad_state = True
294  raise
295  else:
296  log.debug("flushStorageBuffer()")
297  self.defects_folder.flushStorageBuffer()
298  return thunk()
299 
300  def insert(self, defect_id: Union[str, int], since: int, until: int, comment: str, added_by: str,
301  present: bool = True, recoverable: bool = False) -> None:
302  """
303  Insert a new defect into the database.
304 
305  Parameters:
306  `defect_id` : The name or channel identifier for the deect
307  `since`, `until` : The COOL IoV for the range
308  `comment` : String255 arbitrary text comment
309  `added_by` : The user name or "sys:"-prefixed string of the
310  application that inserted the defect
311  `present` : The state of the flag (Default: True)
312  `recoverable` : Indicates whether there is any possibility to set
313  present=False in the future (Default: False)
314  """
315  return self._insert(defect_id, since, until, comment, added_by,
316  present, recoverable, self.defects_tag)
317 
318  def _insert_iov(self, iov: IOVType, tag: str) -> None:
319  """
320  Helper function for inserting IOV objects, since record order doesn't
321  match function argument order
322  """
323  return self._insert(iov.channel, iov.since, iov.until, iov.comment,
324  iov.user, iov.present, iov.recoverable, tag)
325 
326  def _insert(self, defect_id: Union[str, int], since: Union[int, Tuple[int, int], RunLumi],
327  until: Union[int, Tuple[int, int], RunLumi], comment: str, added_by: str,
328  present: bool = True, recoverable: bool = False, tag: str = 'HEAD') -> None:
329  """
330  Implementation of insert, allows tag specification for internal
331  functions
332  """
333  assert not self._read_only, "Insertion on read-only database"
334  assert not self._bad_state, "Please see comment in DefectsDB constructor"
335 
336  # Force load of defects_folder to populate _defect_payload
337  store = self.defects_folder.storeObject
338  p = self._defect_payload
339 
340  p["present"] = present
341  p["recoverable"] = recoverable
342  p["user"] = added_by.encode('utf-8')
343  p["comment"] = comment.encode('utf-8')
344 
345  defect_id = self.defect_chan_as_id(defect_id, True)
346 
347  store(since, until, p, defect_id, tag.encode('ascii'),
348  (True if tag != 'HEAD' else False))
349 
350  def insert_multiple(self, defect_list: Iterable[IOVType],
351  tag: str = 'HEAD', use_flask: bool = False,
352  flask_cool_target: str = 'oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL',
353  flask_auth: Mapping[str, str] = {},
354  flask_db: str = 'CONDBR2',
355  flask_uri: str = 'https://cool-proxy-app.cern.ch/cool/multi_iovs'
356  ):
357  if not use_flask:
358  for defect in defect_list:
359  self._insert_iov(defect, tag)
360  else:
361  # Allow for override from environment
362  import os
363  flask_uri = os.environ.get('DQM_COOL_FLASK_URI', flask_uri)
364 
365  log.debug(f'Flask server URI: {flask_uri}')
366  self._insert_multiple_flask(defect_list, tag, flask_cool_target,
367  flask_auth, flask_db, flask_uri)
368 
369  def _insert_multiple_flask(self, defect_list: Iterable[IOVType],
370  tag: str,
371  flask_cool_target: str,
372  flask_auth: Mapping[str, str],
373  flask_db: str,
374  flask_uri: str):
375  import requests
376  import json
377  import urllib.parse
378  import os
379  from DQUtils.oracle import get_authentication
380  data = {'grant_type':'client_credentials',
381  'audience': 'cool-flask-server'}
382  data.update(flask_auth)
383  auth = requests.post('https://auth.cern.ch/auth/realms/cern/api-access/token',
384  data=data
385  )
386  if not auth:
387  raise RuntimeError('Cannot authenticate to Flask server')
388  else:
389  token = auth.json()['access_token']
390  log.debug(f'auth succeeded {token}')
391  username, password = get_authentication(flask_cool_target)
392  p = urllib.parse.urlparse(flask_cool_target)
393  if not p.hostname:
394  raise ValueError(f'Cannot interpret {flask_cool_target} as a path')
395  server, schema = p.hostname.upper(), p.path[1:]
396 
397  submit_map = {'cmd': 'addIov',
398  'COOLDEST': flask_cool_target,
399  'ORA_SCHEMA': schema,
400  'ORA_INST': flask_db,
401  'COOLOPTS': 'None',
402  'COOLSINST': flask_db,
403  'ORA_SRV': server,
404  'COOLXP': password,
405  'ORA_USER': username,
406  'cool_data': {
407  'folder': DEFECTS_FOLDER,
408  'description': '',
409  'tag': tag,
410  'record': {'present': 'Bool', 'recoverable': 'Bool',
411  'user': 'String255', 'comment': 'String255'},
412  'size': 0,
413  'iovs': []
414  }}
415 
416  for defect in defect_list:
417  submit_map['cool_data']['iovs'].append(
418  {'channel': self.defect_chan_as_id(defect.channel, True),
419  'since': defect.since,
420  'until': defect.until,
421  'payload': {'present': defect.present,
422  'recoverable': defect.recoverable,
423  'user': defect.user,
424  'comment': defect.comment
425  }
426  }
427  )
428  submit_map['cool_data']['size'] += 1
429  r = requests.post(flask_uri,
430  headers={'Authorization': f'Bearer {token}'},
431  files={'file': ('iov.json',
432  json.dumps({'cool_multi_iov_request': submit_map}))},
433  verify=('DQM_COOL_FLASK_NOVERIFY' not in os.environ)
434  )
435  log.debug(r.content)
436  if not r or r.json()['code'] != 0:
437  raise RuntimeError(f'Unable to upload defects. Flask server returned error:\n{r.json()["message"]}')
python.db.DefectsDB._tag
_tag
Definition: DQDefects/python/db.py:84
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename R::value_type > sorted(const R &r, PROJ proj={})
Helper function to create a sorted vector from an unsorted range.
SGTest::store
TestStore store
Definition: TestStore.cxx:23
python.db.DefectsDB.connection_string
connection_string
Definition: DQDefects/python/db.py:80
python.db.DefectsDB.insert
None insert(self, Union[str, int] defect_id, int since, int until, str comment, str added_by, bool present=True, bool recoverable=False)
Definition: DQDefects/python/db.py:300
python.folders.DefectsDBFoldersMixin._clear_connections
None _clear_connections(self)
Definition: folders.py:29
python.virtual_calculator.calculate_virtual_defects
IOVSet calculate_virtual_defects(IOVSet primary_iovs, Iterable[DefectLogic] evaluation_order, Iterable[str] virtual_output_channels, Iterable[str] primary_output_channels, Optional[Union[int, Tuple[int, int], RunLumi]] since, Optional[Union[int, Tuple[int, int], RunLumi]] until, Optional[Container[str]] ignore)
Definition: virtual_calculator.py:64
python.ids.DefectsDBIDsNamesMixin.virtual_defect_id_map
MutableMapping[Union[str, int], Union[str, int]] virtual_defect_id_map(self)
Definition: ids.py:193
python.db.fetch_iovs
def fetch_iovs(folder_name, since=None, until=None, channels=None, tag="", what="all", max_records=-1, with_channel=True, loud=False, database=None, convert_time=False, named_channels=False, selection=None, runs=None, with_time=False, unicode_strings=False)
Definition: DQUtils/python/db.py:65
python.db.DefectsDB.__del__
None __del__(self)
Definition: DQDefects/python/db.py:111
python.ids.DefectsDBIDsNamesMixin.virtual_defect_ids
Set[int] virtual_defect_ids(self)
Definition: ids.py:173
python.ids.DefectsDBIDsNamesMixin.defect_ids
Set[int] defect_ids(self)
Definition: ids.py:141
python.ids.DefectsDBIDsNamesMixin._new_defect
None _new_defect(self, int did, str dname)
Definition: ids.py:120
python.folders.DefectsDBFoldersMixin._defect_payload
_defect_payload
Definition: folders.py:80
python.ids.DefectsDBIDsNamesMixin.defect_names_as_ids
List[int] defect_names_as_ids(self, Iterable[Union[str, int]] channels)
Definition: ids.py:227
python.tags.DefectsDBTagsMixin.defects_tag
str defects_tag(self)
Definition: tags.py:119
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.db.DefectsDB.__init__
None __init__(self, str connection_string=DEFAULT_CONNECTION_STRING, bool read_only=True, bool create=False, Union[str, Tuple] tag="HEAD")
Definition: DQDefects/python/db.py:59
python.exceptions.DefectExistsError
Definition: exceptions.py:3
python.db.DefectsDB
Definition: DQDefects/python/db.py:40
python.db.DefectsDB.insert_multiple
def insert_multiple(self, Iterable[IOVType] defect_list, str tag='HEAD', bool use_flask=False, str flask_cool_target='oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL', Mapping[str, str] flask_auth={}, str flask_db='CONDBR2', str flask_uri='https://cool-proxy-app.cern.ch/cool/multi_iovs')
Definition: DQDefects/python/db.py:350
python.db.DefectsDB.storage_buffer
def storage_buffer(self)
Definition: DQDefects/python/db.py:274
python.ids.DefectsDBIDsNamesMixin.defect_chan_as_id
int defect_chan_as_id(self, Union[str, int] channel, bool primary_only=False)
Definition: ids.py:202
python.virtual_mixin.DefectsDBVirtualDefectsMixin
Definition: virtual_mixin.py:39
python.virtual_mixin.DefectsDBVirtualDefectsMixin._resolve_evaluation_order
List[DefectLogic] _resolve_evaluation_order(self, Optional[Iterable[str]] defects=None)
Definition: virtual_mixin.py:183
python.db.DefectsDB.create_defect
None create_defect(self, str name, str description)
Definition: DQDefects/python/db.py:119
python.ids.DefectsDBIDsNamesMixin.normalize_defect_names
Union[str, List[str]] normalize_defect_names(self, Union[str, Iterable[str]] defect_id)
Definition: ids.py:307
python.db.DefectsDB._create_defect_with_id
None _create_defect_with_id(self, int did, str name, str description)
Definition: DQDefects/python/db.py:134
python.virtual_mixin.DefectsDBVirtualDefectsMixin.resolve_primary_defects
Set[str] resolve_primary_defects(self, Iterable[DefectLogic] defect_logics)
Definition: virtual_mixin.py:208
python.ids.DefectsDBIDsNamesMixin.defect_id_map
MutableMapping[Union[str, int], Union[str, int]] defect_id_map(self)
Definition: ids.py:162
python.oracle.get_authentication
def get_authentication(connection="oracle://ATLAS_COOLPROD/ATLAS_COOLONL_GLOBAL")
Definition: oracle.py:38
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.db.DefectsDB.retrieve
IOVSet retrieve(self, Optional[Union[int, Tuple[int, int], RunLumi]] since=None, Optional[Union[int, Tuple[int, int], RunLumi]] until=None, Optional[Iterable[Union[str, int]]] channels=None, bool nonpresent=False, bool primary_only=False, Optional[Collection[str]] ignore=None, bool with_primary_dependencies=False, bool intersect=False, bool with_time=False, bool evaluate_full=True)
Definition: DQDefects/python/db.py:157
python.db.DefectsDB._insert_iov
None _insert_iov(self, IOVType iov, str tag)
Definition: DQDefects/python/db.py:318
python.channel_mapping.list_to_channelselection
def list_to_channelselection(list_, convert_channel=convert_channel, as_list=False)
Definition: channel_mapping.py:39
python.db.DefectsDB._insert_multiple_flask
def _insert_multiple_flask(self, Iterable[IOVType] defect_list, str tag, str flask_cool_target, Mapping[str, str] flask_auth, str flask_db, str flask_uri)
Definition: DQDefects/python/db.py:369
python.folders.DefectsDBFoldersMixin.defect_logic_folder
cool.IFolder defect_logic_folder(self)
Definition: folders.py:100
python.db.DefectsDB._insert
None _insert(self, Union[str, int] defect_id, Union[int, Tuple[int, int], RunLumi] since, Union[int, Tuple[int, int], RunLumi] until, str comment, str added_by, bool present=True, bool recoverable=False, str tag='HEAD')
Definition: DQDefects/python/db.py:326
python.db.DefectsDB._read_only
_read_only
Definition: DQDefects/python/db.py:81
python.ids.choose_new_defect_id
int choose_new_defect_id(Mapping[Union[str, int], Union[str, int]] existing_map, str defect_name, bool virtual=False)
Definition: ids.py:47
python.db.DefectsDB._bad_state
_bad_state
Definition: DQDefects/python/db.py:101
python.folders.DefectsDBFoldersMixin.defects_folder
cool.IFolder defects_folder(self)
Definition: folders.py:67
python.CaloCondLogger.getLogger
def getLogger(name="CaloCond")
Definition: CaloCondLogger.py:16
python.db.DefectsDB._create
_create
Definition: DQDefects/python/db.py:82
python.tags.tagtype
tagtype
Definition: tags.py:18