ATLAS Offline Software
DQDefects/python/db.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 """
4 Authors: Peter Waller <peter.waller@cern.ch> and "Peter Onyisi" <peter.onyisi@cern.ch>
5 
6 Please see https://twiki.cern.ch/twiki/bin/viewauth/Atlas/DQDefects
7 
8 This file defines DefectsDB and some of its public interface.
9 
10 To separate out the code into digestable units, this class is split into mixins
11 which can be found defined in other files in this directory.
12 
13 The bulk of the core functionality is found in this file.
14 """
15 
16 
17 from logging import getLogger; log = getLogger("DQDefects.db")
18 
19 from contextlib import contextmanager
20 
21 from DQUtils import fetch_iovs, IOVSet
22 from DQUtils.sugar.iovtype import IOVType
23 from DQUtils.channel_mapping import list_to_channelselection
24 from DQUtils.sugar import RunLumi
25 
26 from . import DEFAULT_CONNECTION_STRING, DEFECTS_FOLDER
27 
28 from .exceptions import DefectExistsError, DefectUnknownError
29 from .folders import DefectsDBFoldersMixin
30 from .ids import DefectsDBIDsNamesMixin, choose_new_defect_id
31 from .tags import DefectsDBTagsMixin, tagtype
32 from .virtual_mixin import DefectsDBVirtualDefectsMixin
33 from .virtual_calculator import calculate_virtual_defects
34 from typing import Union, Tuple, Optional, Iterable, Collection, Mapping
35 
36 import six
37 
39  DefectsDBTagsMixin,
40  DefectsDBFoldersMixin,
41  DefectsDBIDsNamesMixin,
42  object):
43  """
44  Implementation is split into mixins.
45 
46  DefectsDBIDsNamesMixin:
47  Defines functions for dealing with defect IDs
48 
49  DefectsDBFolderMixin:
50  Logic for loading or creating the COOL folder/database
51 
52  DefectsDBFoldersMixin:
53  Functions for managing tags
54 
55  DefectsDBVirtualDefectsMixin:
56  Internal code for managing and computing virutal defects
57 
58  Public interface is nominally defined in this class (DefectsDB).
59  """
60 
61  def __init__(self, connection_string: str = DEFAULT_CONNECTION_STRING,
62  read_only: bool = True, create: bool = False, tag: Union[str, Tuple] = "HEAD") -> None:
63  """
64  Create a new DefectsDB instance.
65 
66  The COOL folder resides at `DEFECTS_FOLDER`, which is a module-level
67  varaiable in this module.
68 
69  Parameters:
70  `connection_string` : A COOL connection string, or a string ending
71  in .db. In the latter case, the database is
72  assumed to be a sqlite database at that path.
73  `read_only` : Indicates whether insertions will be disallowed on
74  this connection. (Default: True)
75  `create` : If True, attempt to create the database if it doesn't
76  exist. (Default: False)
77  `tag` : The COOL tag(s) to use. Either a single string used
78  for both defects and virtual defect logic, or a
79  2-tuple (defecttag, logictag). The default is to
80  use HEAD for both defects and logic. Either
81  folder-level or hierarchical tags can be given.
82  """
83  self.connection_string = connection_string
84  self._read_only = read_only
85  self._create = create
86  if isinstance(tag, six.string_types):
87  self._tag = tagtype(tag, tag) if tag else tagtype("HEAD", "HEAD")
88  else:
89  try:
90  tag = tagtype._make(tag)
91  except Exception:
92  raise TypeError('tag argument must be a 2-element sequence')
93  if len(tag) != 2:
94  raise TypeError('tag argument must be a 2-element sequence')
95  self._tag = tag
96  self._tag = tagtype(self._tag[0],
97  self._tag[1])
98 
99  # COOL has no way of emptying a storage buffer. Creating a new storage
100  # buffer flushes the old one. Therefore, if an exception happens
101  # halfway through setting up a storage buffer, subsequent operations
102  # risk flushing a half-completed operation. In that case, we refuse to
103  # do any more write operations with that database connection.
104  self._bad_state = False
105 
106  super(DefectsDB, self).__init__()
107 
108  if create:
109  # Trigger creation of folders through properties
110  self.defects_folder
112 
113  def __del__(self) -> None:
114  """
115  Ideally we would use inheritance to call destructors, but this isn't
116  possible in the general case with the way we (ab)use mixins, so we just
117  do it here.
118  """
119  self._clear_connections()
120 
121  def create_defect(self, name: str, description: str) -> None:
122  """
123  Create a new type of defect; tries to figure out system ID from the
124  defect name. See also: `create_defect_with_id`, `new_system_defect`.
125 
126  Parameters:
127  `name` : name of the defect
128  `description` : a human-readable description of the defect
129  Returns:
130  the ID number of the new defect
131  """
132  sysid = choose_new_defect_id(self.defect_id_map, name)
133  log.info("Creating new defect %s: system ID %08x", name, sysid)
134  self._create_defect_with_id(sysid, name, description)
135 
136  def _create_defect_with_id(self, did: int, name: str, description: str) -> None:
137  """
138  Create a new type of defect, specifying the defect ID.
139  See also: `create_defect`, `new_system_defect`.
140 
141  Parameters:
142  `did` : defect_id
143  `name` : name of the defect
144  `description` : a human-readable description of the defect
145  """
146  log.info("Creating new defect %s (0x%08x)", name, did)
147  if did in self.defect_ids: raise DefectExistsError(did)
148  try:
149  oldname = self.normalize_defect_names(name)
150  raise DefectExistsError(f'Defect {oldname} already exists')
151  except DefectUnknownError:
152  pass
153 
154  self.defects_folder.createChannel(did,
155  name.encode('ascii'),
156  description.encode('utf-8'))
157  self._new_defect(did, name)
158 
159  def retrieve(self, since: Optional[Union[int, Tuple[int,int], RunLumi]] = None,
160  until: Optional[Union[int, Tuple[int,int], RunLumi]] = None,
161  channels: Optional[Iterable[Union[str,int]]] = None, nonpresent: bool = False,
162  primary_only: bool = False, ignore: Optional[Collection[str]] = None,
163  with_primary_dependencies: bool = False, intersect: bool = False,
164  with_time: bool = False, evaluate_full: bool = True) -> IOVSet:
165  """
166  Retrieve defects from the database.
167 
168  Parameters:
169  `since`, `until` : IoV range to query (Default: All)
170  `channels` : A list of channels to query. Can contain a mixture of
171  defect names and ids. (Default: None means all
172  channels, including all virtual)
173  `nonpresent` : Only return IoVs which are currently "present"
174  `primary_only` : Only return primary IoVs, no virtual ones.
175  `ignore` : Set of defects which won't be treated as bad.
176  `with_primary_dependencies` : When querying virtual flags, also get
177  primary flags which went into the calculation.
178  `intersect` : Intersect the result with the query range so that no
179  iov spans outside the query range
180  `with_time` : Also retrieves the time the defect was inserted
181  ~2x slower queries, doesn't work for virtual defects
182  `evaluate_full` : If specified, also compute the `comment` and
183  `recoverable` fields of virtual defects.
184  Causes a ~0.6x slowdown
185  """
186  if ignore is not None and not isinstance(ignore, set):
187  raise RuntimeError("ignore parameter should be set type")
188 
189  desired_channels = None
190  # Figure out the IDs of channels to query and their virtuality
191  if channels is not None:
192  query_channels = set(self.defect_names_as_ids(channels))
193  virtual_channels = query_channels - self.defect_ids
194  primary_channels = query_channels & self.defect_ids
195  else:
196  # Empty channels list means query all channels
197  # (including all virtual)
198  if primary_only:
199  virtual_channels = None
200  else:
201  virtual_channels = self.virtual_defect_ids
202  primary_channels = set(self.defect_ids)
203  query_channels = None # (all)
204 
205  primary_output_names = [self.defect_id_map[pid] for pid in primary_channels]
206  virtual_output_names = [] # (optionally populated below)
207 
208  # Resolve virtual channels to query here, and the primary dependents.
209  if virtual_channels:
210  assert not primary_only, "Requested virtual channels with primary_only=True"
211  assert not with_time, "with_time flag only works for primary defects"
212  virtual_output_names = [self.virtual_defect_id_map[vid]
213  for vid in virtual_channels]
214 
215  ordered_logics = self._resolve_evaluation_order(virtual_output_names)
216 
217  if channels is not None:
218  # Since not all channels are being queried, it is necessary to
219  # add the desired primary channels to the query
220  primary_needed = self.resolve_primary_defects(ordered_logics)
221  primary_channels |= set(self.defect_names_as_ids(primary_needed))
222  query_channels = primary_channels
223 
224  if with_primary_dependencies:
225  primary_output_names.extend(sorted(primary_needed))
226 
227  for logic in ordered_logics:
228  logic.set_evaluation(evaluate_full)
229  else:
230  ordered_logics = []
231 
232  # Figure out if the set of channels will produce too many ranges for COOL
233  if query_channels is not None:
234  query_channels = sorted(query_channels)
235  query_ranges = list_to_channelselection(query_channels, None, True)
236 
237  if len(query_ranges) >= 50:
238  # We're querying too many ranges. Query everything, filter later.
239  desired_channels = set(primary_output_names + virtual_output_names)
240  query_channels = None # (everything)
241 
242  # Retrieve primary IoVs
243  primary_iovs = fetch_iovs(self.defects_folder, since, until,
244  query_channels, self.defects_tag,
245  named_channels=True, unicode_strings=True,
246  with_time=with_time)
247 
248  # Calculate virtual defects (if necessary)
249  if primary_only or not virtual_channels:
250  result = primary_iovs
251  else:
252  if not primary_iovs:
253  return IOVSet()
254  args = (primary_iovs, ordered_logics,
255  virtual_output_names, primary_output_names,
256  since, until, ignore)
257  result = calculate_virtual_defects(*args)
258 
259  # Filter out results which have their present bit removed
260  # (unless otherwise specified)
261  if not nonpresent:
262  result = IOVSet(iov for iov in result if iov.present)
263 
264  # Filter out channels which weren't queried by the user
265  # (to get around 50 channelselection COOL limit)
266  if desired_channels:
267  result = IOVSet(iov for iov in result
268  if iov.channel in desired_channels)
269 
270  if intersect:
271  result = result.intersect_range((since, until))
272 
273  return result
274 
275  @property
276  def storage_buffer(self):
277  """
278  Gives a new context manager for use with the with statement, e.g.:
279  with ddb.storage_buffer:
280  for d in many_defects:
281  ddb.insert(...defect...)
282  """
283  assert not self._bad_state, "Please see comment in DefectsDB constructor"
284 
285  @contextmanager
286  def thunk():
287  log.debug("setupStorageBuffer()")
288  self.defects_folder.setupStorageBuffer()
289  try:
290  yield
291  except Exception:
292  log.warning("Exception raised during DefectsDB.storage_buffer. "
293  "Not flushing storage buffer - but COOL has no way "
294  "to empty it. ")
295  self._bad_state = True
296  raise
297  else:
298  log.debug("flushStorageBuffer()")
299  self.defects_folder.flushStorageBuffer()
300  return thunk()
301 
302  def insert(self, defect_id: Union[str, int], since: int, until: int, comment: str, added_by: str,
303  present: bool = True, recoverable: bool = False) -> None:
304  """
305  Insert a new defect into the database.
306 
307  Parameters:
308  `defect_id` : The name or channel identifier for the deect
309  `since`, `until` : The COOL IoV for the range
310  `comment` : String255 arbitrary text comment
311  `added_by` : The user name or "sys:"-prefixed string of the
312  application that inserted the defect
313  `present` : The state of the flag (Default: True)
314  `recoverable` : Indicates whether there is any possibility to set
315  present=False in the future (Default: False)
316  """
317  return self._insert(defect_id, since, until, comment, added_by,
318  present, recoverable, self.defects_tag)
319 
320  def _insert_iov(self, iov: IOVType, tag: str) -> None:
321  """
322  Helper function for inserting IOV objects, since record order doesn't
323  match function argument order
324  """
325  return self._insert(iov.channel, iov.since, iov.until, iov.comment,
326  iov.user, iov.present, iov.recoverable, tag)
327 
328  def _insert(self, defect_id: Union[str, int], since: Union[int, Tuple[int, int], RunLumi],
329  until: Union[int, Tuple[int, int], RunLumi], comment: str, added_by: str,
330  present: bool = True, recoverable: bool = False, tag: str = 'HEAD') -> None:
331  """
332  Implementation of insert, allows tag specification for internal
333  functions
334  """
335  assert not self._read_only, "Insertion on read-only database"
336  assert not self._bad_state, "Please see comment in DefectsDB constructor"
337 
338  # Force load of defects_folder to populate _defect_payload
339  store = self.defects_folder.storeObject
340  p = self._defect_payload
341 
342  p["present"] = present
343  p["recoverable"] = recoverable
344  p["user"] = added_by.encode('utf-8')
345  p["comment"] = comment.encode('utf-8')
346 
347  defect_id = self.defect_chan_as_id(defect_id, True)
348 
349  store(since, until, p, defect_id, tag.encode('ascii'),
350  (True if tag != 'HEAD' else False))
351 
352  def insert_multiple(self, defect_list: Iterable[IOVType],
353  tag: str = 'HEAD', use_flask: bool = False,
354  flask_cool_target: str = 'oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL',
355  flask_auth: Mapping[str, str] = {},
356  flask_db: str = 'CONDBR2',
357  flask_uri: str = 'https://cool-proxy-app.cern.ch/cool/multi_iovs'
358  ):
359  if not use_flask:
360  for defect in defect_list:
361  self._insert_iov(defect, tag)
362  else:
363  # Allow for override from environment
364  import os
365  flask_uri = os.environ.get('DQM_COOL_FLASK_URI', flask_uri)
366 
367  log.debug(f'Flask server URI: {flask_uri}')
368  self._insert_multiple_flask(defect_list, tag, flask_cool_target,
369  flask_auth, flask_db, flask_uri)
370 
371  def _insert_multiple_flask(self, defect_list: Iterable[IOVType],
372  tag: str,
373  flask_cool_target: str,
374  flask_auth: Mapping[str, str],
375  flask_db: str,
376  flask_uri: str):
377  import requests
378  import json
379  import urllib.parse
380  import os
381  from DQUtils.oracle import get_authentication
382  data = {'grant_type':'client_credentials',
383  'audience': 'cool-flask-server'}
384  data.update(flask_auth)
385  auth = requests.post('https://auth.cern.ch/auth/realms/cern/api-access/token',
386  data=data
387  )
388  if not auth:
389  raise RuntimeError('Cannot authenticate to Flask server')
390  else:
391  token = auth.json()['access_token']
392  log.debug(f'auth succeeded {token}')
393  username, password = get_authentication(flask_cool_target)
394  p = urllib.parse.urlparse(flask_cool_target)
395  if not p.hostname:
396  raise ValueError(f'Cannot interpret {flask_cool_target} as a path')
397  server, schema = p.hostname.upper(), p.path[1:]
398 
399  submit_map = {'cmd': 'addIov',
400  'COOLDEST': flask_cool_target,
401  'ORA_SCHEMA': schema,
402  'ORA_INST': flask_db,
403  'COOLOPTS': 'None',
404  'COOLSINST': flask_db,
405  'ORA_SRV': server,
406  'COOLXP': password,
407  'ORA_USER': username,
408  'cool_data': {
409  'folder': DEFECTS_FOLDER,
410  'description': '',
411  'tag': tag,
412  'record': {'present': 'Bool', 'recoverable': 'Bool',
413  'user': 'String255', 'comment': 'String255'},
414  'size': 0,
415  'iovs': []
416  }}
417 
418  for defect in defect_list:
419  submit_map['cool_data']['iovs'].append(
420  {'channel': self.defect_chan_as_id(defect.channel, True),
421  'since': defect.since,
422  'until': defect.until,
423  'payload': {'present': defect.present,
424  'recoverable': defect.recoverable,
425  'user': defect.user,
426  'comment': defect.comment
427  }
428  }
429  )
430  submit_map['cool_data']['size'] += 1
431  r = requests.post(flask_uri,
432  headers={'Authorization': f'Bearer {token}'},
433  files={'file': ('iov.json',
434  json.dumps({'cool_multi_iov_request': submit_map}))},
435  verify=('DQM_COOL_FLASK_NOVERIFY' not in os.environ)
436  )
437  log.debug(r.content)
438  if not r or r.json()['code'] != 0:
439  raise RuntimeError(f'Unable to upload defects. Flask server returned error:\n{r.json()["message"]}')
python.db.DefectsDB._tag
_tag
Definition: DQDefects/python/db.py:86
SGTest::store
TestStore store
Definition: TestStore.cxx:23
python.db.DefectsDB.connection_string
connection_string
Definition: DQDefects/python/db.py:82
python.db.DefectsDB.insert
None insert(self, Union[str, int] defect_id, int since, int until, str comment, str added_by, bool present=True, bool recoverable=False)
Definition: DQDefects/python/db.py:302
python.folders.DefectsDBFoldersMixin._clear_connections
None _clear_connections(self)
Definition: folders.py:29
python.virtual_calculator.calculate_virtual_defects
IOVSet calculate_virtual_defects(IOVSet primary_iovs, Iterable[DefectLogic] evaluation_order, Iterable[str] virtual_output_channels, Iterable[str] primary_output_channels, Optional[Union[int, Tuple[int, int], RunLumi]] since, Optional[Union[int, Tuple[int, int], RunLumi]] until, Optional[Container[str]] ignore)
Definition: virtual_calculator.py:66
python.ids.DefectsDBIDsNamesMixin.virtual_defect_id_map
MutableMapping[Union[str, int], Union[str, int]] virtual_defect_id_map(self)
Definition: ids.py:194
python.db.fetch_iovs
def fetch_iovs(folder_name, since=None, until=None, channels=None, tag="", what="all", max_records=-1, with_channel=True, loud=False, database=None, convert_time=False, named_channels=False, selection=None, runs=None, with_time=False, unicode_strings=False)
Definition: DQUtils/python/db.py:67
python.db.DefectsDB.__del__
None __del__(self)
Definition: DQDefects/python/db.py:113
python.ids.DefectsDBIDsNamesMixin.virtual_defect_ids
Set[int] virtual_defect_ids(self)
Definition: ids.py:174
python.ids.DefectsDBIDsNamesMixin.defect_ids
Set[int] defect_ids(self)
Definition: ids.py:142
python.ids.DefectsDBIDsNamesMixin._new_defect
None _new_defect(self, int did, str dname)
Definition: ids.py:121
python.folders.DefectsDBFoldersMixin._defect_payload
_defect_payload
Definition: folders.py:80
python.ids.DefectsDBIDsNamesMixin.defect_names_as_ids
List[int] defect_names_as_ids(self, Iterable[Union[str, int]] channels)
Definition: ids.py:228
python.tags.DefectsDBTagsMixin.defects_tag
str defects_tag(self)
Definition: tags.py:119
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.db.DefectsDB.__init__
None __init__(self, str connection_string=DEFAULT_CONNECTION_STRING, bool read_only=True, bool create=False, Union[str, Tuple] tag="HEAD")
Definition: DQDefects/python/db.py:61
python.exceptions.DefectExistsError
Definition: exceptions.py:3
python.db.DefectsDB
Definition: DQDefects/python/db.py:42
python.db.DefectsDB.insert_multiple
def insert_multiple(self, Iterable[IOVType] defect_list, str tag='HEAD', bool use_flask=False, str flask_cool_target='oracle://ATONR_COOLOFL_GPN/ATLAS_COOLOFL_GLOBAL', Mapping[str, str] flask_auth={}, str flask_db='CONDBR2', str flask_uri='https://cool-proxy-app.cern.ch/cool/multi_iovs')
Definition: DQDefects/python/db.py:352
python.db.DefectsDB.storage_buffer
def storage_buffer(self)
Definition: DQDefects/python/db.py:276
python.ids.DefectsDBIDsNamesMixin.defect_chan_as_id
int defect_chan_as_id(self, Union[str, int] channel, bool primary_only=False)
Definition: ids.py:203
python.virtual_mixin.DefectsDBVirtualDefectsMixin
Definition: virtual_mixin.py:40
python.virtual_mixin.DefectsDBVirtualDefectsMixin._resolve_evaluation_order
List[DefectLogic] _resolve_evaluation_order(self, Optional[Iterable[str]] defects=None)
Definition: virtual_mixin.py:184
python.db.DefectsDB.create_defect
None create_defect(self, str name, str description)
Definition: DQDefects/python/db.py:121
python.ids.DefectsDBIDsNamesMixin.normalize_defect_names
Union[str, List[str]] normalize_defect_names(self, Union[str, Iterable[str]] defect_id)
Definition: ids.py:308
python.db.DefectsDB._create_defect_with_id
None _create_defect_with_id(self, int did, str name, str description)
Definition: DQDefects/python/db.py:136
python.virtual_mixin.DefectsDBVirtualDefectsMixin.resolve_primary_defects
Set[str] resolve_primary_defects(self, Iterable[DefectLogic] defect_logics)
Definition: virtual_mixin.py:209
python.ids.DefectsDBIDsNamesMixin.defect_id_map
MutableMapping[Union[str, int], Union[str, int]] defect_id_map(self)
Definition: ids.py:163
python.oracle.get_authentication
def get_authentication(connection="oracle://ATLAS_COOLPROD/ATLAS_COOLONL_GLOBAL")
Definition: oracle.py:38
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename T::value_type > sorted(T begin, T end)
Helper function to create a sorted vector from an unsorted one.
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:232
python.db.DefectsDB.retrieve
IOVSet retrieve(self, Optional[Union[int, Tuple[int, int], RunLumi]] since=None, Optional[Union[int, Tuple[int, int], RunLumi]] until=None, Optional[Iterable[Union[str, int]]] channels=None, bool nonpresent=False, bool primary_only=False, Optional[Collection[str]] ignore=None, bool with_primary_dependencies=False, bool intersect=False, bool with_time=False, bool evaluate_full=True)
Definition: DQDefects/python/db.py:159
python.db.DefectsDB._insert_iov
None _insert_iov(self, IOVType iov, str tag)
Definition: DQDefects/python/db.py:320
python.channel_mapping.list_to_channelselection
def list_to_channelselection(list_, convert_channel=convert_channel, as_list=False)
Definition: channel_mapping.py:42
python.db.DefectsDB._insert_multiple_flask
def _insert_multiple_flask(self, Iterable[IOVType] defect_list, str tag, str flask_cool_target, Mapping[str, str] flask_auth, str flask_db, str flask_uri)
Definition: DQDefects/python/db.py:371
python.folders.DefectsDBFoldersMixin.defect_logic_folder
cool.IFolder defect_logic_folder(self)
Definition: folders.py:100
python.db.DefectsDB._insert
None _insert(self, Union[str, int] defect_id, Union[int, Tuple[int, int], RunLumi] since, Union[int, Tuple[int, int], RunLumi] until, str comment, str added_by, bool present=True, bool recoverable=False, str tag='HEAD')
Definition: DQDefects/python/db.py:328
python.db.DefectsDB._read_only
_read_only
Definition: DQDefects/python/db.py:83
python.ids.choose_new_defect_id
int choose_new_defect_id(Mapping[Union[str, int], Union[str, int]] existing_map, str defect_name, bool virtual=False)
Definition: ids.py:48
python.db.DefectsDB._bad_state
_bad_state
Definition: DQDefects/python/db.py:103
python.folders.DefectsDBFoldersMixin.defects_folder
cool.IFolder defects_folder(self)
Definition: folders.py:67
python.CaloCondLogger.getLogger
def getLogger(name="CaloCond")
Definition: CaloCondLogger.py:16
python.db.DefectsDB._create
_create
Definition: DQDefects/python/db.py:84
python.tags.tagtype
tagtype
Definition: tags.py:18