7 import xml.etree.ElementTree
as ET
10 from AthenaCommon.Logging
import logging
11 log = logging.getLogger(
'TriggerConfigAccessBase.py')
15 with open(filename,
'r')
as fp:
16 config = json.load(fp)
17 filetype = config[
'filetype']
22 NONE = (
"Config",
"None",
"None")
23 L1MENU = (
"L1Menu",
"l1menu",
"L1M")
24 HLTMENU = (
"HLTMenu",
"hltmenu",
"HLTM")
25 L1PS = (
"L1PrescalesSet",
"l1prescale",
"L1PS")
26 HLTPS = (
"HLTPrescalesSet",
"hltprescale",
"HLTPS")
27 BGS = (
"L1BunchGroupsSet",
"bunchgroupset",
"BGS")
28 HLTJO = (
"HLTJobOptions",
"joboptions",
"JO")
29 HLTMON = (
"HLTMonitoring",
"hltmonitoringsummary",
"MGS")
30 def __init__(self, basename, filetype, crestkey):
35 if isinstance(other,str):
38 return self.
filetype == other.filetype
40 return not self.
__eq__(other)
44 ConfigLoader derived classes hold the information of the configuration source
45 and define the method to load the configuration
51 checks that the in-file specification of the configuration type matches the expected type
54 raise RuntimeError(
"Can not load file with filetype '%s' when expecting '%s'" % (config[
'filetype'], self.
configType.filetype))
58 super(ConfigFileLoader,self).
__init__(configType)
62 config = json.load(fp)
68 outfn = os.path.basename(self.filename)
69 if outfn.endswith(
".json"):
70 outfn = outfn.rsplit(
'.',1)[0]
71 return outfn +
".out.json"
74 """Class to load from json string"""
76 super(ConfigDirectLoader,self).
__init__(configType)
87 class ConfigDBLoader(ConfigLoader):
88 def __init__(self, configType, dbalias, dbkey):
97 query template is a dictionary of queries, identified by schema version,
98 similar to TrigConf::TrigDBMenuLoader::m_hltQueries and TrigConf::TrigDBMenuLoader::m_l1Queries
104 """ looks for file, first absolute, then by resolving envvar pathenv"""
105 if os.access(filename,os.R_OK):
107 pathlist = os.getenv(pathenv,
'').
split(os.pathsep)
108 for path
in pathlist:
109 f = os.path.join( path, filename )
110 if os.access( f, os.R_OK ):
112 raise RuntimeError(
"Can't read file %s, neither locally nor in %s" % (filename, pathenv) )
116 dblookupFile = ConfigDBLoader.getResolvedFileName(
"dblookup.xml",
"CORAL_DBLOOKUP_PATH")
117 dbp = ET.parse(dblookupFile)
118 listOfServices =
None
119 for logSvc
in dbp.iter(
"logicalservice"):
120 if logSvc.attrib[
"name"] != dbalias:
122 listOfServices = [ serv.attrib[
"name"]
for serv
in logSvc.iter(
"service") ]
123 if len(listOfServices) == 0:
124 raise RuntimeError(
"DB %s has no services listed in %s" % (dbalias, dblookupFile))
126 if listOfServices
is None:
127 raise RuntimeError(
"DB %s not available in %s" % (dbalias, dblookupFile))
129 if "FRONTIER_SERVER" not in os.environ:
132 listOfServices = [svc
for svc
in listOfServices
if not svc.startswith(
"frontier:")]
135 credentials = dict.fromkeys(listOfServices)
137 for svc
in filter(
lambda s : s.startswith(
"frontier:"), listOfServices):
138 credentials[svc] = dict()
139 credentials[svc][
"user"] = svc
140 credentials[svc][
"password"] =
""
143 authFile = ConfigDBLoader.getResolvedFileName(
"authentication.xml",
"CORAL_AUTH_PATH")
144 except Exception
as e:
145 log.warning(
"File authentication.xml is not available! Oracle connection cannot be established. Exception message is: %s",e)
147 for svc
in filter(
lambda s : s.startswith(
"oracle:"), listOfServices):
148 ap = ET.parse(authFile)
150 for con
in filter(
lambda c: c.attrib[
"name"]==svc, ap.iter(
"connection")):
151 credentials[svc] = dict([(par.attrib[
"name"],par.attrib[
"value"])
for par
in con])
154 raise RuntimeError(
"No credentials found for connection %s from service %s for db %s" % (con,svc,dbalias))
156 raise RuntimeError(
"More than 1 connection found in %s for service %s" % (authFile, svc))
162 ''' Read schema from connection string '''
163 if connStr.startswith(
"oracle:"):
164 [_, schema] = connStr.split(
"/")[-2:]
167 if connStr.startswith(
"frontier:"):
169 pattern =
r"frontier://ATLF/\(\)/(.*)"
170 m = re.match(pattern, connStr)
172 raise RuntimeError(
"connection string '%s' doesn't match the pattern '%s'?" % (connStr, pattern))
173 (schema, ) = m.groups()
176 if connStr.startswith(
"sqlite_file:"):
177 raise NotImplementedError(
"Python-loading of trigger configuration from sqlite has not yet been implemented")
181 ''' Read schema version form database, based on TrigConf::TrigDBLoader::schemaVersion '''
183 q =
"SELECT TS_TAG FROM {schema}.TRIGGER_SCHEMA TS"
184 query = ConfigDBLoader.getCoralQuery(session, q.format(**qdict))
185 cursor = query.execute()
188 versionTag = cursor.currentRow()[0].
data()
190 versionTagPrefix =
"Trigger-Run3-Schema-v"
191 if not versionTag.startswith(versionTagPrefix):
192 raise RuntimeError(
"Tag format error: Trigger schema version tag %s does not start with %s", versionTag, versionTagPrefix)
194 vstr = versionTag[len(versionTagPrefix)]
196 if not vstr.isdigit():
197 raise RuntimeError(
"Invalid argument when interpreting the version part %s of schema tag %s is %s", vstr, versionTag,
type(vstr))
199 log.debug(
"Found schema version %s", vstr)
202 except Exception
as e:
203 log.warning(
"Failed to read schema version: %r", e)
207 ''' Parse output, tables and condition from the query string into coral query object'''
208 query = session.nominalSchema().newQuery()
210 if qdict
is not None:
211 queryStr = queryStr.format(**qdict)
214 bindVars = coral.AttributeList()
215 bindVarsInQuery = re.findall(
r":(\w*)", queryStr)
216 if len(bindVarsInQuery) > 0
and qdict
is None:
217 log.error(
"Query has bound-variable syntax but no value dictionary is provided. Query: %s", queryStr)
218 for k
in bindVarsInQuery:
219 bindVars.extend(k,
"int")
220 bindVars[k].setData(qdict[k])
222 output = queryStr.split(
"SELECT")[1].
split(
"FROM")[0]
223 for field
in output.split(
','):
224 query.addToOutputList(field)
226 log.debug(
"Conversion for Coral of query: %s", queryStr)
228 for table
in queryStr.split(
"FROM")[1].
split(
"WHERE")[0].
split(
","):
229 tableSplit =
list(
filter(
None, table.split(
" ")))
231 query.addToTableList(tableSplit[0].
split(
".")[1], tableSplit[1])
233 if "WHERE" in queryStr:
234 cond = queryStr.split(
"WHERE")[1]
235 m = re.match(
"(.*)(?i: ORDER *BY )(.*)", cond)
237 where, order = m.groups()
238 query.setCondition(where, bindVars)
239 query.addToOrderList(order)
241 query.setCondition(cond, bindVars)
246 '''Choose query based on schema version, based on TrigConf::TrigDBLoader::getQueryDefinition '''
249 if vkey>maxDefVersion
and vkey<=schemaVersion:
252 if maxDefVersion == 0:
253 raise RuntimeError(
"No query available for schema version {0}".
format(schemaVersion))
255 return self.
query[maxDefVersion]
258 credentials: dict[str,Any] = ConfigDBLoader.getConnectionParameters(self.
dbalias)
261 log.error(
"No TriggerDB connections found for %s", self.
dbalias)
262 raise RuntimeError(f
"No TriggerDB connections found for {self.dbalias}")
264 svc = coral.ConnectionService()
265 svcconfig = svc.configuration()
266 svcconfig.disablePoolAutomaticCleanUp()
267 svcconfig.setConnectionTimeOut(0)
270 for credential
in credentials:
271 log.debug(
"Trying credentials %s",credential)
274 session = svc.connect(credential, coral.access_ReadOnly)
275 except Exception
as e:
276 log.warning(
"Failed to establish connection: %s",e)
277 failureMode =
max(1, failureMode)
281 if 'FRONTIER_SERVER' in os.environ
and os.environ[
'FRONTIER_SERVER']:
282 svcconfig.setConnectionRetrialPeriod(300)
283 svcconfig.setConnectionRetrialTimeOut(3600)
285 svcconfig.setConnectionRetrialPeriod(1)
286 svcconfig.setConnectionRetrialTimeOut(1)
289 session.transaction().
start(
True)
290 self.
schema = ConfigDBLoader.getSchema(credential)
291 qdict = {
"schema" : self.
schema,
"dbkey" : self.
dbkey }
294 schemaVersion = ConfigDBLoader.readSchemaVersion(qdict, session)
297 query = ConfigDBLoader.getCoralQuery(session, qstr, qdict)
298 cursor = query.execute()
300 except Exception
as e:
301 log.warning(f
"DB query on {credential} failed to execute.")
302 log.warning(
"Exception message: %r", e)
303 failureMode =
max(2, failureMode)
307 if not cursor.next():
309 log.warning(f
"DB query on {credential} returned empty result, likely due to non-existing key {self.dbkey}")
313 configblob = cursor.currentRow()[0].
data()
314 if type(configblob)
is not str:
315 configblob = configblob.readline()
316 config = json.loads(configblob)
317 session.transaction().
commit()
323 log.error(
"TriggerDB query: could not connect to any source for %s", self.
configType.basename)
324 log.error(
"Considered sources: %s",
", ".
join(credentials))
325 raise RuntimeError(
"TriggerDB query: could not connect to any source", self.
configType.basename)
327 log.error(
"Query failed due to wrong definition for %s", self.
configType.basename)
328 log.error(
"DB query was: %s", qstr.format(**qdict))
329 raise RuntimeError(
"Query failed due to wrong definition", self.
configType.basename)
330 elif failureMode == 3:
331 log.error(
"DB key %s does not exist for %s", self.
dbkey, self.
configType.basename)
332 raise KeyError(
"DB key does not exist", self.
dbkey, self.
configType.basename)
334 raise RuntimeError(
"Query failed for unknown reason")
341 def __init__(self, *, configType, dbalias, dbkey, crestServer):
350 With CREST all queries are defined in the CREST server
355 """get payload from crest server using request library
358 hash (str): the query part of the url as required by the REST api
361 RuntimeError: when connection or query failed
364 dict: the json content
368 url = f
"{self.crestServer}/payloads/data"
373 preq = requests.Request(method=
'GET', url=url, params=params).prepare()
374 with requests.Session()
as session:
376 resp = session.send(preq)
377 except requests.ConnectionError
as exc:
378 log.error(f
"Could not connect to crest server {self.crestServer} ({exc})")
379 raise RuntimeError(f
"Could not connect to CREST server {self.crestServer}")
381 if resp.status_code != 200:
382 log.error(f
"Error: HTTP GET request '{preq.url}' failed")
383 raise RuntimeError(f
"Query {hash} to crest failed with status code {resp.status_code}")
385 config = json.loads(resp.content)
386 self.confirmConfigType(config)
391 dblookupFile = ConfigDBLoader.getResolvedFileName(
"dblookup.xml",
"CORAL_DBLOOKUP_PATH")
392 dbp = ET.parse(dblookupFile)
393 for logSvc
in dbp.iter(
"logicalservice"):
394 if logSvc.attrib[
"name"] != dbalias:
397 for serv
in logSvc.iter(
"service"):
398 if serv.attrib[
"name"].startswith(
"oracle://"):
399 oracleService = serv.attrib[
"name"]
400 oracleServer = oracleService.split(
'/')[3]
402 raise RuntimeError(f
"DB {dbalias} has no oracle services listed in {dblookupFile}")
403 raise RuntimeError(f
"DB {dbalias} is not listed in {dblookupFile}")
407 self.
schema = ConfigCrestLoader._getDBSchemaName(self.
dbalias)
409 "ATLAS_CONF_TRIGGER_RUN3":
"CONF_DATA_RUN3",
410 "ATLAS_CONF_TRIGGER_MC_RUN3":
"CONF_MC_RUN3",
411 "ATLAS_CONF_TRIGGER_REPR_RUN3":
"CONF_REPR_RUN3",
413 if url_schema
is None:
414 raise RuntimeError(f
"Oracle server {self.schema} is not implemented in the crest server {self.crestServer}")
415 hash = f
"triggerdb://{url_schema}/{self.configType.crestkey}/{self.dbkey}"
425 base class to hold the configuration (dict)
426 and provides basic functions to access and print
428 def __init__(self, configType, mainkey, filename = None, jsonString = None, dbalias = None, dbkey = None, useCrest=False, crestServer=""):
429 self.
_getLoader(configType = configType, filename = filename, jsonString = jsonString, dbalias = dbalias, dbkey = dbkey,
430 useCrest=useCrest, crestServer=crestServer)
434 def _getLoader(self, *, configType, filename = None, jsonString = None, dbalias = None, dbkey = None,
435 useCrest:bool =
False, crestServer:str =
""):
438 elif dbalias
and dbkey:
446 raise RuntimeError(
"Neither input file, nor JSON nor db alias and key provided")
464 """ returns the configuration """
469 print(json.dumps(self.
_config, indent = 4, separators=(
',',
': ')))
475 return self[
"filetype"]
478 """ print summary info, should be overwritten by derived classes """
479 log.info(
"Configuration name: {0}".
format(self.
name()))
480 log.info(
"Configuration size: {0}".
format(len(self)))
484 filename: str = self.
loader.getWriteFilename()
485 with open(filename,
'w')
as fh:
486 json.dump(self.
config(), fh, indent = 4, separators=(
',',
': '))
487 log.info(
"Wrote file %s", filename)