8 import xml.etree.ElementTree
as ET
9 from collections
import OrderedDict
as odict
12 from AthenaCommon.Logging
import logging
13 log = logging.getLogger(
'TriggerConfigAccessBase.py')
17 with open(filename,
'r')
as fp:
18 config = json.load(fp)
19 filetype = config[
'filetype']
24 NONE = (
"Config",
"None",
"None")
25 L1MENU = (
"L1Menu",
"l1menu",
"L1M")
26 HLTMENU = (
"HLTMenu",
"hltmenu",
"HLTM")
27 L1PS = (
"L1PrescalesSet",
"l1prescale",
"L1PS")
28 HLTPS = (
"HLTPrescalesSet",
"hltprescale",
"HLTPS")
29 BGS = (
"L1BunchGroupsSet",
"bunchgroupset",
"BGS")
30 HLTJO = (
"HLTJobOptions",
"joboptions",
"JO")
31 HLTMON = (
"HLTMonitoring",
"hltmonitoringsummary",
"MGS")
32 def __init__(self, basename, filetype, crestkey):
37 if isinstance(other,six.string_types):
40 return self.
filetype == other.filetype
42 return not self.
__eq__(other)
46 ConfigLoader derived classes hold the information of the configuration source
47 and define the method to load the configuration
53 checks that the in-file specification of the configuration type matches the expected type
56 raise RuntimeError(
"Can not load file with filetype '%s' when expecting '%s'" % (config[
'filetype'], self.
configType.filetype))
60 super(ConfigFileLoader,self).
__init__(configType)
64 config = json.load(fp, object_pairs_hook = odict)
70 outfn = os.path.basename(self.filename)
71 if outfn.endswith(
".json"):
72 outfn = outfn.rsplit(
'.',1)[0]
73 return outfn +
".out.json"
76 """Class to load from json string"""
78 super(ConfigDirectLoader,self).
__init__(configType)
81 config = json.loads(self.
jsonString, object_pairs_hook = odict)
89 class ConfigDBLoader(ConfigLoader):
90 def __init__(self, configType, dbalias, dbkey):
99 query template is a dictionary of queries, identified by schema version,
100 similar to TrigConf::TrigDBMenuLoader::m_hltQueries and TrigConf::TrigDBMenuLoader::m_l1Queries
106 """ looks for file, first absolute, then by resolving envvar pathenv"""
107 if os.access(filename,os.R_OK):
109 pathlist = os.getenv(pathenv,
'').
split(os.pathsep)
110 for path
in pathlist:
111 f = os.path.join( path, filename )
112 if os.access( f, os.R_OK ):
114 raise RuntimeError(
"Can't read file %s, neither locally nor in %s" % (filename, pathenv) )
118 dblookupFile = ConfigDBLoader.getResolvedFileName(
"dblookup.xml",
"CORAL_DBLOOKUP_PATH")
119 dbp = ET.parse(dblookupFile)
120 listOfServices =
None
121 for logSvc
in dbp.iter(
"logicalservice"):
122 if logSvc.attrib[
"name"] != dbalias:
124 listOfServices = [ serv.attrib[
"name"]
for serv
in logSvc.iter(
"service") ]
125 if len(listOfServices) == 0:
126 raise RuntimeError(
"DB %s has no services listed in %s" % (dbalias, dblookupFile))
128 if listOfServices
is None:
129 raise RuntimeError(
"DB %s not available in %s" % (dbalias, dblookupFile))
131 if "FRONTIER_SERVER" not in os.environ:
134 listOfServices = [svc
for svc
in listOfServices
if not svc.startswith(
"frontier:")]
137 credentials = odict().fromkeys(listOfServices)
139 for svc
in filter(
lambda s : s.startswith(
"frontier:"), listOfServices):
140 credentials[svc] = dict()
141 credentials[svc][
"user"] = svc
142 credentials[svc][
"password"] =
""
145 authFile = ConfigDBLoader.getResolvedFileName(
"authentication.xml",
"CORAL_AUTH_PATH")
146 except Exception
as e:
147 log.warning(
"File authentication.xml is not available! Oracle connection cannot be established. Exception message is: %s",e)
149 for svc
in filter(
lambda s : s.startswith(
"oracle:"), listOfServices):
150 ap = ET.parse(authFile)
152 for con
in filter(
lambda c: c.attrib[
"name"]==svc, ap.iter(
"connection")):
153 credentials[svc] = dict([(par.attrib[
"name"],par.attrib[
"value"])
for par
in con])
156 raise RuntimeError(
"No credentials found for connection %s from service %s for db %s" % (con,svc,dbalias))
158 raise RuntimeError(
"More than 1 connection found in %s for service %s" % (authFile, svc))
164 ''' Read schema from connection string '''
165 if connStr.startswith(
"oracle:"):
166 [_, schema] = connStr.split(
"/")[-2:]
169 if connStr.startswith(
"frontier:"):
171 pattern =
r"frontier://ATLF/\(\)/(.*)"
172 m = re.match(pattern, connStr)
174 raise RuntimeError(
"connection string '%s' doesn't match the pattern '%s'?" % (connStr, pattern))
175 (schema, ) = m.groups()
178 if connStr.startswith(
"sqlite_file:"):
179 raise NotImplementedError(
"Python-loading of trigger configuration from sqlite has not yet been implemented")
183 ''' Read schema version form database, based on TrigConf::TrigDBLoader::schemaVersion '''
185 q =
"SELECT TS_TAG FROM {schema}.TRIGGER_SCHEMA TS"
186 query = ConfigDBLoader.getCoralQuery(session, q.format(**qdict))
187 cursor = query.execute()
190 versionTag = cursor.currentRow()[0].
data()
192 versionTagPrefix =
"Trigger-Run3-Schema-v"
193 if not versionTag.startswith(versionTagPrefix):
194 raise RuntimeError(
"Tag format error: Trigger schema version tag %s does not start with %s", versionTag, versionTagPrefix)
196 vstr = versionTag[len(versionTagPrefix)]
198 if not vstr.isdigit():
199 raise RuntimeError(
"Invalid argument when interpreting the version part %s of schema tag %s is %s", vstr, versionTag,
type(vstr))
201 log.debug(
"Found schema version %s", vstr)
204 except Exception
as e:
205 log.warning(
"Failed to read schema version: %r", e)
209 ''' Parse output, tables and condition from the query string into coral query object'''
210 query = session.nominalSchema().newQuery()
212 if qdict
is not None:
213 queryStr = queryStr.format(**qdict)
216 bindVars = coral.AttributeList()
217 bindVarsInQuery = re.findall(
r":(\w*)", queryStr)
218 if len(bindVarsInQuery) > 0
and qdict
is None:
219 log.error(
"Query has bound-variable syntax but no value dictionary is provided. Query: %s", queryStr)
220 for k
in bindVarsInQuery:
221 bindVars.extend(k,
"int")
222 bindVars[k].setData(qdict[k])
224 output = queryStr.split(
"SELECT")[1].
split(
"FROM")[0]
225 for field
in output.split(
','):
226 query.addToOutputList(field)
228 log.debug(
"Conversion for Coral of query: %s", queryStr)
230 for table
in queryStr.split(
"FROM")[1].
split(
"WHERE")[0].
split(
","):
231 tableSplit =
list(
filter(
None, table.split(
" ")))
233 query.addToTableList(tableSplit[0].
split(
".")[1], tableSplit[1])
235 if "WHERE" in queryStr:
236 cond = queryStr.split(
"WHERE")[1]
237 m = re.match(
"(.*)(?i: ORDER *BY )(.*)", cond)
239 where, order = m.groups()
240 query.setCondition(where, bindVars)
241 query.addToOrderList(order)
243 query.setCondition(cond, bindVars)
248 '''Choose query based on schema version, based on TrigConf::TrigDBLoader::getQueryDefinition '''
251 if vkey>maxDefVersion
and vkey<=schemaVersion:
254 if maxDefVersion == 0:
255 raise RuntimeError(
"No query available for schema version {0}".
format(schemaVersion))
257 return self.
query[maxDefVersion]
260 credentials: dict[str,Any] = ConfigDBLoader.getConnectionParameters(self.
dbalias)
263 log.error(
"No TriggerDB connections found for %s", self.
dbalias)
264 raise RuntimeError(f
"No TriggerDB connections found for {self.dbalias}")
266 svc = coral.ConnectionService()
267 svcconfig = svc.configuration()
268 svcconfig.disablePoolAutomaticCleanUp()
269 svcconfig.setConnectionTimeOut(0)
272 for credential
in credentials:
273 log.debug(
"Trying credentials %s",credential)
276 session = svc.connect(credential, coral.access_ReadOnly)
277 except Exception
as e:
278 log.warning(
"Failed to establish connection: %s",e)
279 failureMode =
max(1, failureMode)
283 if 'FRONTIER_SERVER' in os.environ
and os.environ[
'FRONTIER_SERVER']:
284 svcconfig.setConnectionRetrialPeriod(300)
285 svcconfig.setConnectionRetrialTimeOut(3600)
287 svcconfig.setConnectionRetrialPeriod(1)
288 svcconfig.setConnectionRetrialTimeOut(1)
291 session.transaction().
start(
True)
292 self.
schema = ConfigDBLoader.getSchema(credential)
293 qdict = {
"schema" : self.
schema,
"dbkey" : self.
dbkey }
296 schemaVersion = ConfigDBLoader.readSchemaVersion(qdict, session)
299 query = ConfigDBLoader.getCoralQuery(session, qstr, qdict)
300 cursor = query.execute()
302 except Exception
as e:
303 log.warning(f
"DB query on {credential} failed to execute.")
304 log.warning(
"Exception message: %r", e)
305 failureMode =
max(2, failureMode)
309 if not cursor.next():
311 log.warning(f
"DB query on {credential} returned empty result, likely due to non-existing key {self.dbkey}")
315 configblob = cursor.currentRow()[0].
data()
316 if type(configblob)
is not str:
317 configblob = configblob.readline()
318 config = json.loads(configblob, object_pairs_hook = odict)
319 session.transaction().
commit()
325 log.error(
"TriggerDB query: could not connect to any source for %s", self.
configType.basename)
326 log.error(
"Considered sources: %s",
", ".
join(credentials))
327 raise RuntimeError(
"TriggerDB query: could not connect to any source", self.
configType.basename)
329 log.error(
"Query failed due to wrong definition for %s", self.
configType.basename)
330 log.error(
"DB query was: %s", qstr.format(**qdict))
331 raise RuntimeError(
"Query failed due to wrong definition", self.
configType.basename)
332 elif failureMode == 3:
333 log.error(
"DB key %s does not exist for %s", self.
dbkey, self.
configType.basename)
334 raise KeyError(
"DB key does not exist", self.
dbkey, self.
configType.basename)
336 raise RuntimeError(
"Query failed for unknown reason")
343 def __init__(self, *, configType, dbalias, dbkey, crestServer):
352 With CREST all queries are defined in the CREST server
357 """get payload from crest server using request library
360 hash (str): the query part of the url as required by the REST api
363 RuntimeError: when connection or query failed
366 dict: the json content
370 url = f
"{self.crestServer}/payloads/data"
375 preq = requests.Request(method=
'GET', url=url, params=params).prepare()
376 with requests.Session()
as session:
378 resp = session.send(preq)
379 except requests.ConnectionError
as exc:
380 log.error(f
"Could not connect to crest server {self.crestServer} ({exc})")
381 raise RuntimeError(f
"Could not connect to CREST server {self.crestServer}")
383 if resp.status_code != 200:
384 log.error(f
"Error: HTTP GET request '{preq.url}' failed")
385 raise RuntimeError(f
"Query {hash} to crest failed with status code {resp.status_code}")
387 config = json.loads(resp.content, object_pairs_hook = odict)
388 self.confirmConfigType(config)
393 dblookupFile = ConfigDBLoader.getResolvedFileName(
"dblookup.xml",
"CORAL_DBLOOKUP_PATH")
394 dbp = ET.parse(dblookupFile)
395 for logSvc
in dbp.iter(
"logicalservice"):
396 if logSvc.attrib[
"name"] != dbalias:
399 for serv
in logSvc.iter(
"service"):
400 if serv.attrib[
"name"].startswith(
"oracle://"):
401 oracleService = serv.attrib[
"name"]
402 oracleServer = oracleService.split(
'/')[3]
404 raise RuntimeError(f
"DB {dbalias} has no oracle services listed in {dblookupFile}")
405 raise RuntimeError(f
"DB {dbalias} is not listed in {dblookupFile}")
409 self.
schema = ConfigCrestLoader._getDBSchemaName(self.
dbalias)
411 "ATLAS_CONF_TRIGGER_RUN3":
"CONF_DATA_RUN3",
412 "ATLAS_CONF_TRIGGER_MC_RUN3":
"CONF_MC_RUN3",
413 "ATLAS_CONF_TRIGGER_REPR_RUN3":
"CONF_REPR_RUN3",
415 if url_schema
is None:
416 raise RuntimeError(f
"Oracle server {self.schema} is not implemented in the crest server {self.crestServer}")
417 hash = f
"triggerdb://{url_schema}/{self.configType.crestkey}/{self.dbkey}"
427 base class to hold the configuration (OrderedDict)
428 and provides basic functions to access and print
430 def __init__(self, configType, mainkey, filename = None, jsonString = None, dbalias = None, dbkey = None, useCrest=False, crestServer=""):
431 self.
_getLoader(configType = configType, filename = filename, jsonString = jsonString, dbalias = dbalias, dbkey = dbkey,
432 useCrest=useCrest, crestServer=crestServer)
436 def _getLoader(self, *, configType, filename = None, jsonString = None, dbalias = None, dbkey = None,
437 useCrest:bool =
False, crestServer:str =
""):
440 elif dbalias
and dbkey:
448 raise RuntimeError(
"Neither input file, nor JSON nor db alias and key provided")
466 """ returns the configuration """
471 print(json.dumps(self.
_config, indent = 4, separators=(
',',
': ')))
477 return self[
"filetype"]
480 """ print summary info, should be overwritten by derived classes """
481 log.info(
"Configuration name: {0}".
format(self.
name()))
482 log.info(
"Configuration size: {0}".
format(len(self)))
486 filename: str = self.
loader.getWriteFilename()
487 with open(filename,
'w')
as fh:
488 json.dump(self.
config(), fh, indent = 4, separators=(
',',
': '))
489 log.info(
"Wrote file %s", filename)