ATLAS Offline Software
Loading...
Searching...
No Matches
DQUtils/python/db.py
Go to the documentation of this file.
1#! /usr/bin/env python
2
3# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
4
5
6from logging import getLogger; log = getLogger("DQUtils.db")
7
8import re
9import sys
10from collections import namedtuple
11from io import StringIO
12from datetime import datetime
13from keyword import iskeyword
14from os.path import dirname
15
16from CoolConvUtilities.AtlCoolLib import indirectOpen
17
18from .channel_mapping import make_channelselection, get_channel_ids_names
19from .selection import make_browse_objects_selection
20from .sugar import IOVSet, RunLumi, RunLumiType, TimestampType, make_iov_type
21
22DEFAULT_DBNAME = "CONDBR2"
23
24def make_safe_fields(fields):
25 return [(field + "_") if iskeyword(field) else field for field in fields]
26
27def get_query_range(since, until, runs):
28 """
29 Take `since`, `until` and `runs` and turn them into parameters.
30
31 If nothing is specified, an infinite range is used.
32 If `runs` is an integer, just that run is used
33 If `runs` is a two-tuple, then it is used as a (from_run, to_run)
34 """
35 if runs and (since is None and until is None):
36 from builtins import int
37 if isinstance(runs, tuple):
38 since, until = (runs[0], 0), (runs[1], 0)
39
40 elif isinstance(runs, int):
41 since, until = (runs, 0), (runs+1, 0)
42 else:
43 raise RuntimeError("Invalid type for `runs`, should be int or tuple")
44
45 elif runs:
46 raise RuntimeError("Specify (since and/or until), OR runs, not both")
47
48 else:
49 if since is None: since = 0
50 if until is None: until = 2**63-1
51
52 if isinstance(since, tuple): since = RunLumi(*since)
53 if isinstance(until, tuple): until = RunLumi(*until)
54
55 if isinstance(since, str): since = TimestampType.from_string(since)
56 if isinstance(until, str): until = TimestampType.from_string(until)
57
58 if isinstance(since, datetime): since = TimestampType.from_date(since)
59 if isinstance(until, datetime): until = TimestampType.from_date(until)
60
61 assert since <= until, "Bad query range (since > until?)"
62
63 return since, until
64
65def fetch_iovs(folder_name, since=None, until=None, channels=None, tag="",
66 what="all", max_records=-1, with_channel=True, loud=False,
67 database=None, convert_time=False, named_channels=False,
68 selection=None, runs=None, with_time=False, unicode_strings=False):
69 """
70 Helper to fetch objects in a pythonic manner
71 `folder_name` may be an abbreviated name (DQMFONL) or a fully-qualified name
72 (e.g. /GLOBAL/DETSTATUS/DQMFONL)
73 `since`, `until` can be (run, lumi) tuples, or standard iov keys
74 `channels` can be a cool ChannelSelection object or a list of ids/names
75 `tag` COOL folder tag
76 `what` is a list of strings specifying which records should be fetched
77 if it is the string "all" (not a list), then all records are fetched,
78 and naming is turned on.
79 `max_records` specifies the maximum number of records to fetch. -1 means all
80 `with_channel` specifies whether the channel number should be in the result
81 list of each tuple
82 `loud` specifies whether quick_retrieve (C++ function) should print its
83 status every 1000 objects
84 `database` can be used to specify an abbreviated database, or a connection
85 string
86 `convert_time` performs a conversion of `since` and `until` from runlumi
87 to nanoseconds since the epoch.
88 `named_channels` causes the iovs returned to contain strings in the channel
89 identifier
90 `selection` [NOT IMPLEMENTED YET] create a cool selection object
91 `runs` if it is an integer, it is a run number. If it is a tuple, it is a
92 run range.
93 `with_time` retrieve insertiontime for iovs
94 `unicode_strings` return unicode string objects, assuming database content
95 is UTF-8
96 """
97 from .quick_retrieve import quick_retrieve, browse_coracool, get_coracool_payload_spec
98
99 if channels == []: return IOVSet()
100
101 since, until = get_query_range(since, until, runs)
102
103 channel_mapping = None
104 if isinstance(folder_name, str):
105 folder = Databases.get_folder(folder_name, database)
106 else:
107 try:
108 folder = folder_name
109 folder_name = folder.fullPath()
110 except Exception:
111 log.error("Exception when interpreting folder: {0}".format(folder_name))
112 raise
113
114 log.info("Querying %s", folder_name)
115 log.debug("Query range: [%s, %s]", since, until)
116
117 short_folder = folder.fullPath().split("/")[-1]
118
119 time_based_folder = "<timeStamp>time</timeStamp>" in folder.description()
120 coracool_folder = "<coracool>" in folder.description()
121 iov_key_type = TimestampType if time_based_folder else RunLumiType
122
123 if time_based_folder and (convert_time or runs):
124 # Perform a conversion of the run IoV to a time-based one.
125 # Note: probably inadvisable to do this for long ranges since
126 # it has to retrieve all of the luminosity blocks that took place
127 # in the query range.
128
129 until = min(until, RunLumi(100000000, 0))
130 runrange = fetch_iovs("LBLB", since, until)
131 if runrange:
132 # If the runrange is empty for some reason, fall back.
133 since, until = runrange.first.StartTime, runrange.last.EndTime
134 return fetch_iovs(folder_name, since, until, channels, tag, what,
135 max_records, with_channel, loud,
136 database, convert_time=False,
137 named_channels=named_channels, selection=selection,
138 with_time=with_time,
139 unicode_strings=unicode_strings)
140 else:
141 return IOVSet()
142
143 detstatus_names = "DQMFOFL", "DCSOFL", "DQMFONL", "SHIFTOFL", "SHIFTONL", "LBSUMM"
144 if any(short_folder.endswith(x) for x in detstatus_names):
145 channel_mapping = None # get channel mapping from channel_mapping.py
146 else:
147 _, _, channelmap = get_channel_ids_names(folder)
148 cm_reversed = {value: key for key, value in channelmap.items()}
149 channelmap.update(cm_reversed)
150 channel_mapping = channelmap
151
152 channels = make_channelselection(channels, channel_mapping)
153
154 field_name = "%s_VAL" % short_folder
155
156 if not coracool_folder:
157 if what == "all":
158 what = folder.folderSpecification().payloadSpecification().keys()
159
160 channelmap = None
161 if named_channels:
162 _, _, channelmap = get_channel_ids_names(folder)
163
164 folder.setPrefetchAll(False)
165
166 if selection:
167 sel = make_browse_objects_selection(folder, selection)
168 iterator = folder.browseObjects(since, until, channels, tag, sel)
169 else:
170 iterator = folder.browseObjects(since, until, channels, tag)
171
172 fields = []
173 if with_time:
174 fields.append("insertion_time")
175 if with_channel:
176 fields.append("channel")
177 fields.extend(what)
178
179 record = make_iov_type(field_name, make_safe_fields(fields))
180
181 result = quick_retrieve(iterator, record, what, max_records, with_channel,
182 loud, iov_key_type, channelmap, with_time,
183 unicode_strings)
184
185 else:
186 args = folder_name, database
187 database, folder_path = Databases.resolve_folder_string(*args)
188
189 # Coracool
190 assert database, "Coracool folder - you must specify a database"
191
192 db = Databases.get_instance(database)
193 spec = get_coracool_payload_spec(db, folder_path)
194 if what == "all":
195 what = spec.keys()
196
197 assert isinstance(what, list), ("Coracool folder - you must specify "
198 "`what` to fetch (it cannot be inferred, as with non-coracool.)")
199
200 record = make_iov_type(field_name, ["channel", "elements"])
201
202 element = namedtuple("element", " ".join(make_safe_fields(what)))
203
204 result = browse_coracool(db, folder_path, since, until, channels, "",
205 what, record, element, iov_key_type)
206
207 result = IOVSet(result, iov_type=record, origin=short_folder)
208
209 return result
210
211def write_iovs(folder_name, iovs, record, multiversion=True, tag="",
212 create=False, storage_buffer=False):
213 args = folder_name, multiversion, record, create
214 db, folder, payload = Databases.fetch_for_writing(*args)
215
216 if storage_buffer:
217 folder.setupStorageBuffer()
218
219 total_iovs = len(iovs)
220 for i, iov in enumerate(iovs):
221 for field_name, field_value in zip(iov._fields[3:], iov[3:]):
222 payload[field_name] = field_value
223
224 folder.storeObject(iov.since, iov.until, payload, iov.channel, tag)
225 if not i % 1000:
226 log.debug("Wrote %5i / %5i records", i, total_iovs)
227
228 if storage_buffer:
229 log.debug("Flushing records to database...")
230 folder.flushStorageBuffer()
231 log.debug("... done")
232
234 """
235 Databases helper class. Used as a singleton. (don't instantiate)
236 Specifies abbreviations for database connection strings and folders
237 """
238
239 FOLDERS = {
240 "DQMFONL" : "COOLONL_GLOBAL::/GLOBAL/DETSTATUS/DQMFONL",
241 "DQMFONLLB" : "COOLONL_GLOBAL::/GLOBAL/DETSTATUS/DQMFONLLB",
242 "SHIFTONL" : "COOLONL_GLOBAL::/GLOBAL/DETSTATUS/SHIFTONL",
243
244 "DQMFOFL" : "COOLOFL_GLOBAL::/GLOBAL/DETSTATUS/DQMFOFL",
245 "DCSOFL" : "COOLOFL_GLOBAL::/GLOBAL/DETSTATUS/DCSOFL",
246 "DQCALCOFL" : "COOLOFL_GLOBAL::/GLOBAL/DETSTATUS/DQCALCOFL",
247 "SHIFTOFL" : "COOLOFL_GLOBAL::/GLOBAL/DETSTATUS/SHIFTOFL",
248 "LBSUMM" : "COOLOFL_GLOBAL::/GLOBAL/DETSTATUS/LBSUMM",
249 "VIRTUALFLAGS" : "COOLOFL_GLOBAL::/GLOBAL/DETSTATUS/VIRTUALFLAGS",
250
251 "DEFECTS" : "COOLOFL_GLOBAL::/GLOBAL/DETSTATUS/DEFECTS",
252
253 "SOR_Params" : "COOLONL_TDAQ::/TDAQ/RunCtrl/SOR_Params",
254 "EOR_Params" : "COOLONL_TDAQ::/TDAQ/RunCtrl/EOR_Params",
255 "SOR" : "COOLONL_TDAQ::/TDAQ/RunCtrl/SOR",
256 "EOR" : "COOLONL_TDAQ::/TDAQ/RunCtrl/EOR",
257
258 "LBLB" : "COOLONL_TRIGGER::/TRIGGER/LUMI/LBLB",
259 "LBTIME" : "COOLONL_TRIGGER::/TRIGGER/LUMI/LBTIME",
260 "LBLESTONL" : "COOLONL_TRIGGER::/TRIGGER/LUMI/LBLESTONL",
261 "LVL1COUNTERS" : "COOLONL_TRIGGER::/TRIGGER/LUMI/LVL1COUNTERS",
262 "HLTCOUNTERS" : "COOLOFL_TRIGGER::/TRIGGER/LUMI/HLTCOUNTERS",
263 "HLT/Menu" : "COOLONL_TRIGGER::/TRIGGER/HLT/Menu",
264 "LVL1/Menu" : "COOLONL_TRIGGER::/TRIGGER/LVL1/Menu",
265 "Prescales" : "COOLONL_TRIGGER::/TRIGGER/LVL1/Prescales",
266 }
267
268 @classmethod
269 def resolve_folder_string(cls, folder_name, db_override=None):
270 """
271 Resolves a simplified folder URI.
272
273 Examples:
274 folder_name == "SHIFTOFL"
275 will give a connection to COOLOFL_GLOBAL/COMP200
276 folder /GLOBAL/DETSTATUS/SHIFTOFL
277 folder_name == "test.db::SHIFTOFL"
278 will give a connection to an sqlite database test.db
279
280 Priority:
281 * Database specified in db_override
282 * Database specified in `folder_name`
283 * Database specified in cls.FOLDERS[folder_name]
284 """
285 res_database = db_override
286
287 # First check - is a database specified in the folder name?
288 if "::" in folder_name:
289 assert folder_name.count("::") == 1, "Bad folder format"
290 database, folder_name = folder_name.split("::")
291
292 # If res_database hasn't already been set, do so
293 res_database = database if not res_database else res_database
294
295 if folder_name in cls.FOLDERS:
296 database, res_folder = cls.FOLDERS[folder_name].split("::")
297 res_database = database if not res_database else res_database
298 else:
299 res_folder = folder_name
300
301 return res_database, res_folder
302
303 @classmethod
304 def get_folder(cls, folder_name, db_override=None, read_only=True,
305 create_function=None, also_db=False):
306 """
307 Retrieve a folder. The `db_override` parameter allows over-riding the
308 database which comes from the folder string.
309
310 Parameters:
311 `folder_name` : Folder name or alias to load
312 `db_override` : If specified, causes an alternate database string
313 to be used.
314 `read_only` : Specifies if a read-only database connection should
315 be used.
316 `create_function` : If specified, function to be called in case the
317 folder doesn't exist. It is passed the database
318 connection.
319 `also_db` : Make the return value (db, folder)
320 """
321
322 if read_only:
323 assert not create_function, "`read_only` specified with `create`"
324
325 database, folder = cls.resolve_folder_string(folder_name, db_override)
326 assert database, ("Unable to resolve database for (%s, %s)"
327 % (folder_name, db_override))
328
329 create = bool(create_function)
330 db = cls.get_instance(database, read_only, create)
331
332 try:
333 cool_folder = db.getFolder(folder)
334 except Exception as error:
335 log.debug('HELP! %s', error.args)
336 args = str(error)
337 log.debug('THIS IS %s', type(args))
338 log.debug('Value of boolean: %s', ("not found" in args))
339 if not ("cannot be established" in args or
340 "not found" in args
341 ):
342 log.exception("Unknown error encountered whilst opening "
343 "database connection to '%s'",
344 database)
345 raise
346
347 if not create_function:
348 log.exception("Database does not exist, `create_function` not "
349 "specified")
350 raise
351
352 cool_folder = create_function(db)
353 if also_db:
354 return db, cool_folder
355 return cool_folder
356
357 @classmethod
358 def resolve_db_string(cls, db_string, read_only=True):
359 """
360 Take a database string - if it looks like a filename ending in ".db"
361 then assume it is a sqlite database with that name.
362
363 If the `db_string` is prepended with "WRITE|" then a writable connection
364 is requested.
365
366 If the db_string doesn't contain a "/", then "/" + DEFAULT_DBNAME is
367 appended.
368 """
369 if "://" in db_string:
370 # Assume that the string is already resolved
371 return db_string, read_only
372
373 if db_string.startswith("WRITE|"):
374 assert db_string.count("|") == 1, "Bad db_string format"
375 db_string = db_string.split("|")[1]
376 read_only = False
377
378 sqlite_regex = re.compile(r"(?P<filename>.*?\.db)(?:/?(?P<dbname>[^/]+))?$")
379 matched = sqlite_regex.match(db_string)
380 if matched:
381 filename, dbname = matched.groups()
382 dbname = DEFAULT_DBNAME if not dbname else dbname
383 db_string = "sqlite://schema=%s;dbname=%s" % (filename, dbname)
384 else:
385 if "/" not in db_string:
386 return db_string + "/" + DEFAULT_DBNAME, read_only
387
388 return db_string, read_only
389
390 @classmethod
391 def get_instance(cls, db_string, read_only=True, create=False):
392 """
393 Open a database connection
394 """
395 res_db_string, read_only = cls.resolve_db_string(db_string, read_only)
396
397 try:
398 prev_stdout = sys.stdout
399 sys.stdout = StringIO()
400 try:
401 connection = indirectOpen(res_db_string, readOnly=read_only)
402 finally:
403 sys.stdout = prev_stdout
404 except Exception as e:
405 if ((e.args and "The database does not exist" in e.args[0]) or
406 str(e).find ('The database does not exist') >= 0) and not create:
407 log.info("Failed trying to connect to '%s'", res_db_string)
408 raise
409 from PyCool import cool
410 dbService = cool.DatabaseSvcFactory.databaseService()
411 connection = dbService.createDatabase(res_db_string)
412 return connection
413
414 @classmethod
415 def build_folder(cls, db, folder_name, multiversion, record):
416 """
417 Create `folder_name` on database instance `db`, with recordspecification
418 `record`.
419
420 Also creates folderset to which folder_name belongs if necessary.
421 """
422 from PyCool import cool
423
424 folderset_path = dirname(folder_name)
425 try:
426 db.getFolderSet(folderset_path)
427 except Exception as error:
428 caught_error = "Folder set %s not found" % folderset_path
429 if caught_error not in error.args[0]:
430 raise
431
432 log.debug("Folderset doesn't exist - creating it.")
433 db.createFolderSet(folderset_path, "", True)
434
435 if not isinstance(record, cool.RecordSpecification):
436 record_spec = cool.RecordSpecification()
437 for field in record:
438 record_spec.extend(*field)
439 else:
440 record_spec = record
441
442 FV = cool.FolderVersioning
443 versioning = FV.MULTI_VERSION if multiversion else FV.SINGLE_VERSION
444 folder_spec = cool.FolderSpecification(versioning, record_spec)
445 folder = db.createFolder(folder_name, folder_spec)
446 payload = cool.Record(record_spec)
447
448 return folder, payload
449
450 @classmethod
451 def fetch_for_writing(cls, orig_folder_name, multiversion=True, record=None,
452 create=False, db_override=None):
453 """
454 Retrieve a folder for writing. Creates it if it doesn't exist.
455
456 `folder_name` specifies folder to be queried
457 `multiversion` specifies COOL versioning mode
458 `record` is a list of fields to be created in the form:
459 [("<field name>", cool.StorageType.<field type>), ...]
460 or if None, defaults to one Code record,
461 or if isinstance(record, cool.RecordSpecification), uses this.
462 `create` should the database be created if it doesn't
463 `db_override` overrides automatic detection of database string
464 """
465 from PyCool import cool
466 if record is None:
467 record = [("Code", cool.StorageType.Int32)]
468
469 database, folder_name = cls.resolve_folder_string(orig_folder_name)
470 if db_override:
471 database = db_override
472
473 try:
474 db = cls.get_instance(database, False)
475
476 except Exception as error:
477 if not create or "The database does not exist" not in error.args[0]:
478 raise
479
480 from PyCool import cool
481 dbService = cool.DatabaseSvcFactory.databaseService()
482
483 resolved_database, _ = cls.resolve_db_string(database)
484
485 log.info("Database doesn't exist - creating it.")
486 db = dbService.createDatabase(resolved_database)
487
488 try:
489 folder = db.getFolder(folder_name)
490 payload = cool.Record(folder.payloadSpecification())
491 except Exception as error:
492 if not create or "Folder %s not found" % folder_name not in error.args[0]:
493 raise
494
495 log.debug("Folder doesn't exist - creating it.")
496 args = db, folder_name, multiversion, record
497 folder, payload = cls.build_folder(*args)
498
499 return db, folder, payload
#define min(a, b)
Definition cfImp.cxx:40
get_instance(cls, db_string, read_only=True, create=False)
resolve_folder_string(cls, folder_name, db_override=None)
build_folder(cls, db, folder_name, multiversion, record)
resolve_db_string(cls, db_string, read_only=True)
fetch_for_writing(cls, orig_folder_name, multiversion=True, record=None, create=False, db_override=None)
get_folder(cls, folder_name, db_override=None, read_only=True, create_function=None, also_db=False)
const cool::RecordSpecification get_coracool_payload_spec(IDatabasePtr cooldb, const string &folder)
PyObject * browse_coracool(IDatabasePtr cooldb, const string &folder, ValidityKey since, ValidityKey until, const ChannelSelection &cs=ChannelSelection::all(), const char *tag="", PyObject *to_fetch=NULL, PyObject *object_converter=NULL, PyObject *inner_object_converter=NULL, PyObject *iovkey_wrapper=NULL)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
make_safe_fields(fields)
get_query_range(since, until, runs)
fetch_iovs(folder_name, since=None, until=None, channels=None, tag="", what="all", max_records=-1, with_channel=True, loud=False, database=None, convert_time=False, named_channels=False, selection=None, runs=None, with_time=False, unicode_strings=False)
write_iovs(folder_name, iovs, record, multiversion=True, tag="", create=False, storage_buffer=False)
std::string dirname(std::string name)
Definition utils.cxx:200