3 from abc
import ABC, abstractmethod
8 import xml.etree.ElementTree
as ET
9 from TrigConfStorage.TriggerCrestUtil
import TriggerCrestUtil
12 from AthenaCommon.Logging
import logging
13 log = logging.getLogger(
'TriggerConfigAccessBase.py')
17 with open(filename,
'r')
as fp:
18 config = json.load(fp)
19 filetype = config[
'filetype']
24 NONE = (
"Config",
"None",
"None")
25 L1MENU = (
"L1Menu",
"l1menu",
"L1M")
26 HLTMENU = (
"HLTMenu",
"hltmenu",
"HLTM")
27 L1PS = (
"L1PrescalesSet",
"l1prescale",
"L1PS")
28 HLTPS = (
"HLTPrescalesSet",
"hltprescale",
"HLTPS")
29 BGS = (
"L1BunchGroupsSet",
"bunchgroupset",
"BGS")
30 HLTJO = (
"HLTJobOptions",
"joboptions",
"JO")
31 HLTMON = (
"HLTMonitoring",
"hltmonitoringsummary",
"MGS")
32 def __init__(self, basename, filetype, crestkey):
37 if isinstance(other,str):
40 return self.
filetype == other.filetype
42 return not self.
__eq__(other)
46 ConfigLoader derived classes hold the information of the configuration source
47 and define the method to load the configuration
50 self.configType: ConfigType = configType
53 checks that the in-file specification of the configuration type matches the expected type
55 if config[
'filetype'] != self.configType:
56 raise RuntimeError(
"Can not load file with filetype '%s' when expecting '%s'" % (config[
'filetype'], self.configType.filetype))
59 def load(self) -> dict[str, Any]:
70 class ConfigFileLoader(ConfigLoader):
72 super(ConfigFileLoader,self).
__init__(configType)
74 def load(self) -> dict[str, Any]:
76 config = json.load(fp)
82 outfn = os.path.basename(self.filename)
83 if outfn.endswith(
".json"):
84 outfn = outfn.rsplit(
'.',1)[0]
85 return outfn +
".out.json"
88 """Class to load from json string"""
90 super(ConfigDirectLoader,self).
__init__(configType)
92 def load(self) -> dict[str, Any]:
101 class ConfigDBLoader(ConfigLoader):
111 query template is a dictionary of queries, identified by schema version,
112 similar to TrigConf::TrigDBMenuLoader::m_hltQueries and TrigConf::TrigDBMenuLoader::m_l1Queries
118 """ looks for file, first absolute, then by resolving envvar pathenv"""
119 if os.access(filename,os.R_OK):
121 pathlist = os.getenv(pathenv,
'').
split(os.pathsep)
122 for path
in pathlist:
123 f = os.path.join( path, filename )
124 if os.access( f, os.R_OK ):
126 raise RuntimeError(
"Can't read file %s, neither locally nor in %s" % (filename, pathenv) )
130 dblookupFile = ConfigDBLoader.getResolvedFileName(
"dblookup.xml",
"CORAL_DBLOOKUP_PATH")
131 dbp = ET.parse(dblookupFile)
134 for logSvc
in dbp.iter(
"logicalservice"):
135 if logSvc.attrib[
"name"] != dbalias:
138 listOfServices = [ serv.attrib[
"name"]
for serv
in logSvc.iter(
"service") ]
139 if len(listOfServices) == 0:
140 raise RuntimeError(
"DB %s has no services listed in %s" % (dbalias, dblookupFile))
143 raise RuntimeError(
"DB %s not available in %s" % (dbalias, dblookupFile))
145 if "FRONTIER_SERVER" not in os.environ:
148 listOfServices: list[str] = [svc
for svc
in listOfServices
if not svc.startswith(
"frontier:")]
151 credentials: dict[str, Any] = dict.fromkeys(listOfServices)
153 for svc
in filter(
lambda s : s.startswith(
"frontier:"), listOfServices):
154 credentials[svc] = {}
155 credentials[svc][
"user"] = svc
156 credentials[svc][
"password"] =
""
159 authFile = ConfigDBLoader.getResolvedFileName(
"authentication.xml",
"CORAL_AUTH_PATH")
160 except Exception
as e:
161 log.warning(
"File authentication.xml is not available! Oracle connection cannot be established. Exception message is: %s",e)
163 for svc
in filter(
lambda s : s.startswith(
"oracle:"), listOfServices):
164 ap = ET.parse(authFile)
166 for con
in filter(
lambda c: c.attrib[
"name"]==svc, ap.iter(
"connection")):
167 credentials[svc] = dict([(par.attrib[
"name"],par.attrib[
"value"])
for par
in con])
170 raise RuntimeError(
"No credentials found for connection %s from service %s for db %s" % (con,svc,dbalias))
172 raise RuntimeError(
"More than 1 connection found in %s for service %s" % (authFile, svc))
178 ''' Read schema from connection string '''
179 if connStr.startswith(
"oracle:"):
180 [_, schema] = connStr.split(
"/")[-2:]
183 if connStr.startswith(
"frontier:"):
185 pattern =
r"frontier://ATLF/\(\)/(.*)"
186 m = re.match(pattern, connStr)
188 raise RuntimeError(
"connection string '%s' doesn't match the pattern '%s'?" % (connStr, pattern))
189 (schema, ) = m.groups()
192 if connStr.startswith(
"sqlite_file:"):
193 raise NotImplementedError(
"Python-loading of trigger configuration from sqlite has not yet been implemented")
197 ''' Read schema version form database, based on TrigConf::TrigDBLoader::schemaVersion '''
199 q =
"SELECT TS_TAG FROM {schema}.TRIGGER_SCHEMA TS"
200 query = ConfigDBLoader.getCoralQuery(session, q.format(**qdict))
201 cursor = query.execute()
204 versionTag = cursor.currentRow()[0].
data()
206 versionTagPrefix =
"Trigger-Run3-Schema-v"
207 if not versionTag.startswith(versionTagPrefix):
208 raise RuntimeError(
"Tag format error: Trigger schema version tag %s does not start with %s", versionTag, versionTagPrefix)
210 vstr = versionTag[len(versionTagPrefix)]
212 if not vstr.isdigit():
213 raise RuntimeError(
"Invalid argument when interpreting the version part %s of schema tag %s is %s", vstr, versionTag,
type(vstr))
215 log.debug(
"Found schema version %s", vstr)
218 except Exception
as e:
219 log.warning(
"Failed to read schema version: %r", e)
223 ''' Parse output, tables and condition from the query string into coral query object'''
224 query = session.nominalSchema().newQuery()
226 if qdict
is not None:
227 queryStr = queryStr.format(**qdict)
230 bindVars = coral.AttributeList()
231 bindVarsInQuery = re.findall(
r":(\w*)", queryStr)
232 if len(bindVarsInQuery) > 0
and qdict
is None:
233 log.error(
"Query has bound-variable syntax but no value dictionary is provided. Query: %s", queryStr)
234 for k
in bindVarsInQuery:
235 bindVars.extend(k,
"int")
236 bindVars[k].setData(qdict[k])
238 output = queryStr.split(
"SELECT")[1].
split(
"FROM")[0]
239 for field
in output.split(
','):
240 query.addToOutputList(field)
242 log.debug(
"Conversion for Coral of query: %s", queryStr)
244 for table
in queryStr.split(
"FROM")[1].
split(
"WHERE")[0].
split(
","):
245 tableSplit =
list(
filter(
None, table.split(
" ")))
247 query.addToTableList(tableSplit[0].
split(
".")[1], tableSplit[1])
249 if "WHERE" in queryStr:
250 cond = queryStr.split(
"WHERE")[1]
251 m = re.match(
"(.*)(?i: ORDER *BY )(.*)", cond)
253 where, order = m.groups()
254 query.setCondition(where, bindVars)
255 query.addToOrderList(order)
257 query.setCondition(cond, bindVars)
262 '''Choose query based on schema version, based on TrigConf::TrigDBLoader::getQueryDefinition '''
265 if vkey>maxDefVersion
and vkey<=schemaVersion:
268 if maxDefVersion == 0:
269 raise RuntimeError(
"No query available for schema version {0}".
format(schemaVersion))
271 return self.
query[maxDefVersion]
273 def load(self) -> dict[str, Any]:
274 credentials: dict[str,Any] = ConfigDBLoader.getConnectionParameters(self.
dbalias)
277 log.error(
"No TriggerDB connections found for %s", self.
dbalias)
278 raise RuntimeError(f
"No TriggerDB connections found for {self.dbalias}")
280 svc = coral.ConnectionService()
281 svcconfig = svc.configuration()
282 svcconfig.disablePoolAutomaticCleanUp()
283 svcconfig.setConnectionTimeOut(0)
286 for credential
in credentials:
287 log.debug(
"Trying credentials %s",credential)
290 session = svc.connect(credential, coral.access_ReadOnly)
291 except Exception
as e:
292 log.warning(
"Failed to establish connection: %s",e)
293 failureMode =
max(1, failureMode)
297 if 'FRONTIER_SERVER' in os.environ
and os.environ[
'FRONTIER_SERVER']:
298 svcconfig.setConnectionRetrialPeriod(300)
299 svcconfig.setConnectionRetrialTimeOut(3600)
301 svcconfig.setConnectionRetrialPeriod(1)
302 svcconfig.setConnectionRetrialTimeOut(1)
305 session.transaction().
start(
True)
306 self.
schema = ConfigDBLoader.getSchema(credential)
307 qdict = {
"schema" : self.
schema,
"dbkey" : self.
dbkey }
310 schemaVersion = ConfigDBLoader.readSchemaVersion(qdict, session)
313 query = ConfigDBLoader.getCoralQuery(session, qstr, qdict)
314 cursor = query.execute()
316 except Exception
as e:
317 log.warning(f
"DB query on {credential} failed to execute.")
318 log.warning(
"Exception message: %r", e)
319 failureMode =
max(2, failureMode)
323 if not cursor.next():
325 log.warning(f
"DB query on {credential} returned empty result, likely due to non-existing key {self.dbkey}")
329 configblob = cursor.currentRow()[0].
data()
330 if type(configblob)
is not str:
331 configblob = configblob.readline()
332 config = json.loads(configblob)
333 session.transaction().
commit()
339 log.error(
"TriggerDB query: could not connect to any source for %s", self.configType.basename)
340 log.error(
"Considered sources: %s",
", ".
join(credentials))
341 raise RuntimeError(
"TriggerDB query: could not connect to any source", self.configType.basename)
343 log.error(
"Query failed due to wrong definition for %s", self.configType.basename)
344 log.error(
"DB query was: %s", qstr.format(**qdict))
345 raise RuntimeError(
"Query failed due to wrong definition", self.configType.basename)
346 elif failureMode == 3:
347 log.error(
"DB key %s does not exist for %s", self.
dbkey, self.configType.basename)
348 raise KeyError(
"DB key does not exist", self.
dbkey, self.configType.basename)
350 raise RuntimeError(
"Query failed for unknown reason")
354 return "{basename}_{schema}_{dbkey}.json".
format(basename = self.configType.basename, schema = self.
schema, dbkey = self.
dbkey)
357 def __init__(self, *, configType: ConfigType, dbname: str, dbkey: int, crestServer: str):
366 With CREST all queries are defined in the CREST server
371 """get payload from crest server using request library
374 hash (str): the query part of the url as required by the REST api
377 RuntimeError: when connection or query failed
380 dict: the json content
384 url = f
"{self.crestServer}/payloads/data"
389 preq = requests.Request(method=
'GET', url=url, params=params).prepare()
390 with requests.Session()
as session:
392 resp = session.send(preq)
393 except requests.ConnectionError
as exc:
394 log.error(f
"Could not connect to crest server {self.crestServer} ({exc})")
395 raise RuntimeError(f
"Could not connect to CREST server {self.crestServer}")
397 if resp.status_code != 200:
398 log.error(f
"Error: HTTP GET request '{preq.url}' failed")
399 raise RuntimeError(f
"Query {hash} to crest failed with status code {resp.status_code}")
401 config = json.loads(resp.content)
402 self.confirmConfigType(config)
405 def load(self) -> dict[str, Any]:
407 crest_conn = TriggerCrestUtil.getCrestConnection(self.
dbname)
408 if crest_conn
is None:
409 raise RuntimeError(f
"Cannot resolve CREST connection for dbname {self.dbname}")
410 hash = f
"triggerdb://{crest_conn}/{self.configType.crestkey}/{self.dbkey}"
416 schema = TriggerCrestUtil.crestconn_dbname_mapping.get(self.
dbname, self.
dbname)
417 return "{basename}_{schema}_{dbkey}.json".
format(basename = self.configType.basename, schema = schema, dbkey = self.
dbkey)
421 base class to hold the configuration (dict)
422 and provides basic functions to access and print
424 def __init__(self, configType: ConfigType, mainkey: str, filename: str =
"", jsonString: str =
"",
425 dbalias: str =
"", dbkey: int = 0, useCrest: bool =
False, crestServer: str =
""):
426 self.loader: ConfigLoader = TriggerConfigAccess._getLoader(configType = configType, filename = filename, jsonString = jsonString, dbalias = dbalias, dbkey = dbkey,
427 useCrest=useCrest, crestServer=crestServer)
433 """ Determine whether the connection is a CREST connection
435 connParams: dict[str, str] = {}
436 if dbalias.startswith(
"http://")
or dbalias.startswith(
"https://"):
438 if (match := re.match(
r'^(http[s]?://[^/]+/api-v\d+\.\d+)/(.+)$', dbalias)):
439 connParams[
"crest_server"] = match.group(1)
440 connParams[
"dbname"] = match.group(2)
442 raise RuntimeError(f
"Invalid CREST connection string: {dbalias}")
443 elif dbalias
in TriggerCrestUtil.allCrestConnections():
444 connParams[
"crest_server"] = crestServer
445 connParams[
"dbname"] = dbalias
447 connParams[
"crest_server"] = crestServer
448 if (dbname := TriggerCrestUtil.getCrestConnection(dbalias))
is not None:
449 connParams[
"dbname"] = dbname
451 raise RuntimeError(f
"Cannot resolve CREST connection '{dbalias}'")
455 def _getLoader(*, configType, filename: str =
"", jsonString: str =
"", dbalias: str =
"", dbkey: int = 0,
456 useCrest: bool =
False, crestServer: str =
"") -> ConfigLoader:
460 elif dbalias
and dbkey>0:
461 if (connParams := TriggerConfigAccess._isCrestConnection(dbalias, useCrest, crestServer)):
462 return ConfigCrestLoader(configType=configType, dbname=connParams[
"dbname"], dbkey=dbkey, crestServer=connParams[
"crest_server"])
468 raise RuntimeError(
"Neither input file, nor JSON nor db alias and key provided")
486 """ returns the configuration """
491 print(json.dumps(self.
_config, indent = 4, separators=(
',',
': ')))
497 return self[
"filetype"]
500 """ print summary info, should be overwritten by derived classes """
501 log.info(
"Configuration name: {0}".
format(self.
name()))
502 log.info(
"Configuration size: {0}".
format(len(self)))
506 filename = self.loader.getWriteFilename()
507 with open(filename,
'w')
as fh:
508 json.dump(self.
config(), fh, indent = 4, separators=(
',',
': '))
509 log.info(
"Wrote file %s", filename)