ATLAS Offline Software
Loading...
Searching...
No Matches
TriggerConfigAccessBase.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
3from abc import ABC, abstractmethod
4import os
5import json
6import re
7from typing import Any
8import xml.etree.ElementTree as ET
9from TrigConfStorage.TriggerCrestUtil import TriggerCrestUtil
10import coral
11
12from AthenaCommon.Logging import logging
13log = logging.getLogger('TriggerConfigAccessBase.py')
14
15def getFileType(filename):
16 filetype = "unknown"
17 with open(filename, 'r') as fp:
18 config = json.load(fp)
19 filetype = config['filetype']
20 return filetype
21
22from enum import Enum
23class ConfigType(Enum):
24 NONE = ("Config", "None", "None")
25 L1MENU = ("L1Menu", "l1menu", "L1M")
26 HLTMENU = ("HLTMenu", "hltmenu", "HLTM")
27 L1PS = ("L1PrescalesSet", "l1prescale", "L1PS")
28 HLTPS = ("HLTPrescalesSet", "hltprescale", "HLTPS")
29 BGS = ("L1BunchGroupsSet", "bunchgroupset", "BGS")
30 HLTJO = ("HLTJobOptions", "joboptions", "JO")
31 HLTMON = ("HLTMonitoring", "hltmonitoringsummary", "MGS")
32 def __init__(self, basename, filetype, crestkey):
33 self.basename = basename
34 self.filetype = filetype
35 self.crestkey = crestkey
36 def __eq__(self, other):
37 if isinstance(other,str):
38 return self.filetype == other
39 else:
40 return self.filetype == other.filetype
41 def __ne__(self, other):
42 return not self.__eq__(other)
43
44class ConfigLoader(ABC):
45 """
46 ConfigLoader derived classes hold the information of the configuration source
47 and define the method to load the configuration
48 """
49 def __init__(self,configType: ConfigType):
50 self.configType: ConfigType = configType
51 def confirmConfigType(self,config):
52 """
53 checks that the in-file specification of the configuration type matches the expected type
54 """
55 if config['filetype'] != self.configType:
56 raise RuntimeError("Can not load file with filetype '%s' when expecting '%s'" % (config['filetype'], self.configType.filetype))
57
58 @abstractmethod
59 def load(self) -> dict[str, Any]:
60 pass
61
62 @abstractmethod
63 def getWriteFilename(self) -> str:
64 pass
65
66 @abstractmethod
67 def setQuery(self, query):
68 pass
69
70class ConfigFileLoader(ConfigLoader):
71 def __init__(self, configType, filename ):
72 super(ConfigFileLoader,self).__init__(configType)
73 self.filename = filename
74 def load(self) -> dict[str, Any]:
75 with open(self.filename, 'r') as fp:
76 config = json.load(fp)
77 self.confirmConfigType(config)
78 return config
79 def setQuery(self, query):
80 pass
82 outfn = os.path.basename(self.filename)
83 if outfn.endswith(".json"):
84 outfn = outfn.rsplit('.',1)[0]
85 return outfn + ".out.json"
86
88 """Class to load from json string"""
89 def __init__(self, configType, jsonString):
90 super(ConfigDirectLoader,self).__init__(configType)
91 self.jsonString = jsonString
92 def load(self) -> dict[str, Any]:
93 config = json.loads(self.jsonString)
94 self.confirmConfigType(config)
95 return config
96 def setQuery(self, query):
97 pass
99 pass
100
101class ConfigDBLoader(ConfigLoader):
102 def __init__(self, configType, dbalias, dbkey):
103 super().__init__(configType)
104 self.dbalias = dbalias
105 self.dbkey = dbkey
106 self.query = {}
107 self.schema = None
108
109 def setQuery(self, query):
110 """
111 query template is a dictionary of queries, identified by schema version,
112 similar to TrigConf::TrigDBMenuLoader::m_hltQueries and TrigConf::TrigDBMenuLoader::m_l1Queries
113 """
114 self.query = query
115
116 @staticmethod
117 def getResolvedFileName(filename, pathenv=""):
118 """ looks for file, first absolute, then by resolving envvar pathenv"""
119 if os.access(filename,os.R_OK):
120 return filename
121 pathlist = os.getenv(pathenv,'').split(os.pathsep)
122 for path in pathlist:
123 f = os.path.join( path, filename )
124 if os.access( f, os.R_OK ):
125 return f
126 raise RuntimeError("Can't read file %s, neither locally nor in %s" % (filename, pathenv) )
127
128 @staticmethod
130 dblookupFile = ConfigDBLoader.getResolvedFileName("dblookup.xml", "CORAL_DBLOOKUP_PATH")
131 dbp = ET.parse(dblookupFile)
132 listOfServices = []
133 foundAlias = False
134 for logSvc in dbp.iter("logicalservice"):
135 if logSvc.attrib["name"] != dbalias:
136 continue
137 foundAlias = True
138 listOfServices = [ serv.attrib["name"] for serv in logSvc.iter("service") ]
139 if len(listOfServices) == 0:
140 raise RuntimeError("DB %s has no services listed in %s" % (dbalias, dblookupFile))
141 break
142 if not foundAlias:
143 raise RuntimeError("DB %s not available in %s" % (dbalias, dblookupFile))
144
145 if "FRONTIER_SERVER" not in os.environ:
146 # remove all frontier connnections in the list if the environment FRONTIER_SERVER variable does not exist
147 # this speeds up the resolution of the connection specification (dbalias)
148 listOfServices: list[str] = [svc for svc in listOfServices if not svc.startswith("frontier:")]
149
150 # now get the account and pw for oracle connections
151 credentials: dict[str, Any] = dict.fromkeys(listOfServices)
152
153 for svc in filter(lambda s : s.startswith("frontier:"), listOfServices):
154 credentials[svc] = {}
155 credentials[svc]["user"] = svc
156 credentials[svc]["password"] = ""
157
158 try:
159 authFile = ConfigDBLoader.getResolvedFileName("authentication.xml", "CORAL_AUTH_PATH")
160 except Exception as e:
161 log.warning("File authentication.xml is not available! Oracle connection cannot be established. Exception message is: %s",e)
162 else:
163 for svc in filter(lambda s : s.startswith("oracle:"), listOfServices):
164 ap = ET.parse(authFile)
165 count = 0
166 for con in filter( lambda c: c.attrib["name"]==svc, ap.iter("connection")):
167 credentials[svc] = dict([(par.attrib["name"],par.attrib["value"]) for par in con])
168 count += 1
169 if count==0:
170 raise RuntimeError("No credentials found for connection %s from service %s for db %s" % (con,svc,dbalias))
171 if count>1:
172 raise RuntimeError("More than 1 connection found in %s for service %s" % (authFile, svc))
173
174 return credentials
175
176 @staticmethod
177 def getSchema(connStr):
178 ''' Read schema from connection string '''
179 if connStr.startswith("oracle:"):
180 [_, schema] = connStr.split("/")[-2:]
181 return schema
182
183 if connStr.startswith("frontier:"):
184 import re
185 pattern = r"frontier://ATLF/\‍(\‍)/(.*)"
186 m = re.match(pattern, connStr)
187 if not m:
188 raise RuntimeError("connection string '%s' doesn't match the pattern '%s'?" % (connStr, pattern))
189 (schema, ) = m.groups()
190 return schema
191
192 if connStr.startswith("sqlite_file:"):
193 raise NotImplementedError("Python-loading of trigger configuration from sqlite has not yet been implemented")
194
195 @staticmethod
196 def readSchemaVersion(qdict, session):
197 ''' Read schema version form database, based on TrigConf::TrigDBLoader::schemaVersion '''
198 try:
199 q = "SELECT TS_TAG FROM {schema}.TRIGGER_SCHEMA TS"
200 query = ConfigDBLoader.getCoralQuery(session, q.format(**qdict))
201 cursor = query.execute()
202 cursor.next()
203
204 versionTag = cursor.currentRow()[0].data()
205
206 versionTagPrefix = "Trigger-Run3-Schema-v"
207 if not versionTag.startswith(versionTagPrefix):
208 raise RuntimeError( "Tag format error: Trigger schema version tag %s does not start with %s", versionTag, versionTagPrefix)
209
210 vstr = versionTag[len(versionTagPrefix):]
211
212 if not vstr.isdigit():
213 raise RuntimeError( "Invalid argument when interpreting the version part %s of schema tag %s is %s", vstr, versionTag, type(vstr))
214
215 log.debug("Found schema version %s", vstr)
216 return int(vstr)
217
218 except Exception as e:
219 log.warning("Failed to read schema version: %r", e)
220
221 @staticmethod
222 def getCoralQuery(session, queryStr, qdict = None):
223 ''' Parse output, tables and condition from the query string into coral query object'''
224 query = session.nominalSchema().newQuery()
225
226 if qdict is not None:
227 queryStr = queryStr.format(**qdict)
228
229 # bind vars
230 bindVars = coral.AttributeList() # type: ignore
231 bindVarsInQuery = re.findall(r":(\w*)", queryStr)
232 if len(bindVarsInQuery) > 0 and qdict is None:
233 log.error("Query has bound-variable syntax but no value dictionary is provided. Query: %s", queryStr)
234 for k in bindVarsInQuery:
235 bindVars.extend(k, "int")
236 bindVars[k].setData(qdict[k])
237
238 output = queryStr.split("SELECT")[1].split("FROM")[0]
239 for field in output.split(','):
240 query.addToOutputList(field)
241
242 log.debug("Conversion for Coral of query: %s", queryStr)
243
244 for table in queryStr.split("FROM")[1].split("WHERE")[0].split(","):
245 tableSplit = list(filter(None, table.split(" ")))
246 # Schema name is stripped from TableList in Coral query
247 query.addToTableList(tableSplit[0].split(".")[1], tableSplit[1])
248
249 if "WHERE" in queryStr:
250 cond = queryStr.split("WHERE")[1]
251 m = re.match("(.*)(?i: ORDER *BY )(.*)", cond) # check for "order by" clause
252 if m:
253 where, order = m.groups()
254 query.setCondition(where, bindVars)
255 query.addToOrderList(order)
256 else:
257 query.setCondition(cond, bindVars)
258
259 return query
260
261 def getQueryDefinition(self, schemaVersion):
262 '''Choose query based on schema version, based on TrigConf::TrigDBLoader::getQueryDefinition '''
263 maxDefVersion = 0
264 for vkey in self.query.keys():
265 if vkey>maxDefVersion and vkey<=schemaVersion:
266 maxDefVersion = vkey
267
268 if maxDefVersion == 0:
269 raise RuntimeError("No query available for schema version {0}".format(schemaVersion))
270
271 return self.query[maxDefVersion]
272
273 def load(self) -> dict[str, Any]:
274 credentials: dict[str,Any] = ConfigDBLoader.getConnectionParameters(self.dbalias)
275
276 if not credentials:
277 log.error("No TriggerDB connections found for %s", self.dbalias)
278 raise RuntimeError(f"No TriggerDB connections found for {self.dbalias}")
279
280 svc = coral.ConnectionService() # type: ignore
281 svcconfig = svc.configuration()
282 svcconfig.disablePoolAutomaticCleanUp()
283 svcconfig.setConnectionTimeOut(0)
284
285 failureMode = 0
286 for credential in credentials:
287 log.debug("Trying credentials %s",credential)
288
289 try:
290 session = svc.connect(credential, coral.access_ReadOnly) # type: ignore
291 except Exception as e:
292 log.warning("Failed to establish connection: %s",e)
293 failureMode = max(1, failureMode)
294 continue
295
296 # Check that the FRONTIER_SERVER is set properly, if not reduce the retrial period and time out values
297 if 'FRONTIER_SERVER' in os.environ and os.environ['FRONTIER_SERVER']:
298 svcconfig.setConnectionRetrialPeriod(300)
299 svcconfig.setConnectionRetrialTimeOut(3600)
300 else:
301 svcconfig.setConnectionRetrialPeriod(1)
302 svcconfig.setConnectionRetrialTimeOut(1)
303
304 try:
305 session.transaction().start(True) # readOnly
306 self.schema = ConfigDBLoader.getSchema(credential)
307 qdict = { "schema" : self.schema, "dbkey" : self.dbkey }
308
309 # Choose query based on schema
310 schemaVersion = ConfigDBLoader.readSchemaVersion(qdict, session)
311 qstr = self.getQueryDefinition(schemaVersion)
312 # execute coral query
313 query = ConfigDBLoader.getCoralQuery(session, qstr, qdict)
314 cursor = query.execute()
315
316 except Exception as e:
317 log.warning(f"DB query on {credential} failed to execute.")
318 log.warning("Exception message: %r", e)
319 failureMode = max(2, failureMode)
320 continue # to next source
321
322 # Read query result
323 if not cursor.next():
324 # empty result
325 log.warning(f"DB query on {credential} returned empty result, likely due to non-existing key {self.dbkey}")
326 failureMode = 3
327 continue # to next source
328
329 configblob = cursor.currentRow()[0].data()
330 if type(configblob) is not str:
331 configblob = configblob.readline()
332 config = json.loads(configblob)
333 session.transaction().commit()
334
335 self.confirmConfigType(config)
336 return config
337
338 if failureMode == 1:
339 log.error("TriggerDB query: could not connect to any source for %s", self.configType.basename)
340 log.error("Considered sources: %s", ", ".join(credentials))
341 raise RuntimeError("TriggerDB query: could not connect to any source", self.configType.basename)
342 if failureMode == 2:
343 log.error("Query failed due to wrong definition for %s", self.configType.basename)
344 log.error("DB query was: %s", qstr.format(**qdict))
345 raise RuntimeError("Query failed due to wrong definition", self.configType.basename)
346 elif failureMode == 3:
347 log.error("DB key %s does not exist for %s", self.dbkey, self.configType.basename)
348 raise KeyError("DB key does not exist", self.dbkey, self.configType.basename)
349 else:
350 raise RuntimeError("Query failed for unknown reason")
351
352 # proposed filename when writing config to file
353 def getWriteFilename(self) -> str:
354 return "{basename}_{schema}_{dbkey}.json".format(basename = self.configType.basename, schema = self.schema, dbkey = self.dbkey)
355
357 def __init__(self, *, configType: ConfigType, dbname: str, dbkey: int, crestServer: str):
358 super().__init__(configType)
359 self.crestServer = crestServer
360 self.dbname = dbname
361 self.dbkey = dbkey
362 self.schema = ""
363
364 def setQuery(self, query):
365 """
366 With CREST all queries are defined in the CREST server
367 """
368 pass
369
370 def _get_payload(self, hash: str) -> dict:
371 """get payload from crest server using request library
372
373 Args:
374 hash (str): the query part of the url as required by the REST api
375
376 Raises:
377 RuntimeError: when connection or query failed
378
379 Returns:
380 dict: the json content
381 """
382 import requests
383
384 url = f"{self.crestServer}/payloads/data"
385 params = {
386 "format": "BLOB",
387 "hash": hash
388 }
389 preq = requests.Request(method='GET', url=url, params=params).prepare()
390 with requests.Session() as session:
391 try:
392 resp = session.send(preq)
393 except requests.ConnectionError as exc:
394 log.error(f"Could not connect to crest server {self.crestServer} ({exc})")
395 raise RuntimeError(f"Could not connect to CREST server {self.crestServer}")
396
397 if resp.status_code != 200:
398 log.error(f"Error: HTTP GET request '{preq.url}' failed")
399 raise RuntimeError(f"Query {hash} to crest failed with status code {resp.status_code}")
400
401 config = json.loads(resp.content)
402 self.confirmConfigType(config)
403 return config
404
405 def load(self) -> dict[str, Any]:
406 # see SCHEMA_MAP in https://gitlab.cern.ch/crest-db/crest/-/blob/master/src/main/java/hep/crest/server/repositories/triggerdb/TriggerDb.java
407 crest_conn = TriggerCrestUtil.getCrestConnection(self.dbname)
408 if crest_conn is None:
409 raise RuntimeError(f"Cannot resolve CREST connection for dbname {self.dbname}")
410 hash = f"triggerdb://{crest_conn}/{self.configType.crestkey}/{self.dbkey}"
411 config = self._get_payload(hash=hash)
412 return config
413
414 # proposed filename when writing config to file
416 schema = TriggerCrestUtil.crestconn_dbname_mapping.get(self.dbname, self.dbname)
417 return "{basename}_{schema}_{dbkey}.json".format(basename = self.configType.basename, schema = schema, dbkey = self.dbkey)
418
420 """
421 base class to hold the configuration (dict)
422 and provides basic functions to access and print
423 """
424 def __init__(self, configType: ConfigType, mainkey: str, filename: str = "", jsonString: str = "",
425 dbalias: str = "", dbkey: int = 0, useCrest: bool = False, crestServer: str = ""):
426 self.loader: ConfigLoader = TriggerConfigAccess._getLoader(configType = configType, filename = filename, jsonString = jsonString, dbalias = dbalias, dbkey = dbkey,
427 useCrest=useCrest, crestServer=crestServer)
428 self._mainkey = mainkey
429 self._config = None
430
431 @staticmethod
432 def _isCrestConnection(dbalias: str, useCrest: bool, crestServer: str) -> dict[str, str]:
433 """ Determine whether the connection is a CREST connection
434 """
435 connParams: dict[str, str] = {}
436 if dbalias.startswith("http://") or dbalias.startswith("https://"):
437 # expect a complete crest connection string, consisting of server and dbname
438 if (match := re.match(r'^(http[s]?://[^/]+/api-v\d+\.\d+)/(.+)$', dbalias)):
439 connParams["crest_server"] = match.group(1)
440 connParams["dbname"] = match.group(2)
441 else:
442 raise RuntimeError(f"Invalid CREST connection string: {dbalias}")
443 elif dbalias in TriggerCrestUtil.allCrestConnections():
444 connParams["crest_server"] = crestServer
445 connParams["dbname"] = dbalias
446 elif useCrest:
447 connParams["crest_server"] = crestServer
448 if (dbname := TriggerCrestUtil.getCrestConnection(dbalias)) is not None:
449 connParams["dbname"] = dbname
450 else:
451 raise RuntimeError(f"Cannot resolve CREST connection '{dbalias}'")
452 return connParams
453
454 @staticmethod
455 def _getLoader(*, configType, filename: str = "", jsonString: str = "", dbalias: str = "", dbkey: int = 0,
456 useCrest: bool = False, crestServer: str = "") -> ConfigLoader:
457
458 if filename:
459 return ConfigFileLoader(configType, filename )
460 elif dbalias and dbkey>0:
461 if (connParams := TriggerConfigAccess._isCrestConnection(dbalias, useCrest, crestServer)):
462 return ConfigCrestLoader(configType=configType, dbname=connParams["dbname"], dbkey=dbkey, crestServer=connParams["crest_server"])
463 else:
464 return ConfigDBLoader(configType, dbalias, dbkey)
465 elif jsonString:
466 return ConfigDirectLoader(configType, jsonString )
467 else:
468 raise RuntimeError("Neither input file, nor JSON nor db alias and key provided")
469
470 def load(self) -> None:
471 self._config = self.loader.load()
472
473 def __str__(self):
474 return str(self._config)
475
476 def __iter__(self):
477 return iter(self[self._mainkey])
478
479 def __getitem__(self, item):
480 return self._config[item]
481
482 def __len__(self):
483 return len(self[self._mainkey])
484
485 def config(self):
486 """ returns the configuration """
487 return self._config
488
489 def prettyPrint(self):
490 if self._config:
491 print(json.dumps(self._config, indent = 4, separators=(',', ': ')))
492
493 def name(self):
494 return self["name"]
495
496 def filetype(self):
497 return self["filetype"]
498
499 def printSummary(self):
500 """ print summary info, should be overwritten by derived classes """
501 log.info("Configuration name: {0}".format(self.name()))
502 log.info("Configuration size: {0}".format(len(self)))
503
504 def writeFile(self, filename: str | None = None):
505 if filename is None:
506 filename = self.loader.getWriteFilename()
507 with open(filename, 'w') as fh:
508 json.dump(self.config(), fh, indent = 4, separators=(',', ': '))
509 log.info("Wrote file %s", filename)
void print(char *figname, TCanvas *c1)
#define max(a, b)
Definition cfImp.cxx:41
__init__(self, *, ConfigType configType, str dbname, int dbkey, str crestServer)
__init__(self, basename, filetype, crestkey)
__init__(self, ConfigType configType, str mainkey, str filename="", str jsonString="", str dbalias="", int dbkey=0, bool useCrest=False, str crestServer="")
dict[str, str] _isCrestConnection(str dbalias, bool useCrest, str crestServer)
ConfigLoader _getLoader(*, configType, str filename="", str jsonString="", str dbalias="", int dbkey=0, bool useCrest=False, str crestServer="")
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177