ATLAS Offline Software
TrigConfFrontier.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
3 
4 from AthenaCommon.Logging import logging
5 import time
6 import sys
7 
8 log = logging.getLogger( "TrigConfFrontier.py" )
9 
10 def getServerUrls(frontier_servers):
11  """
12  turns
13  '(serverurl=http://atlasfrontier-local.cern.ch:8000/atlr)(serverurl=http://atlasfrontier-ai.cern.ch:8000/atlr)'
14  into
15  ['http://atlasfrontier-local.cern.ch:8000/atlr','http://atlasfrontier-ai.cern.ch:8000/atlr']
16  """
17  from re import findall
18  return findall(r'\‍(serverurl=(.*?)\‍)',frontier_servers)
19 
20 
21 def resolveUrl(url):
22  """
23  Expects input string to be a URL or $FRONTIER_SERVER
24  Returns an accessible URL or None"""
25  import re
26  if re.match("http://",url): # simple URL specification http://...
27  return [url]
28 
29  if re.match(r'\‍(serverurl=(.*?)\‍)',url): # syntax of FRONTIER_SERVER
30  return getServerUrls(url)
31 
32 
33 def getFrontierCursor(urls, schema, loglevel = logging.INFO):
34  log.setLevel(loglevel)
35  url_list = resolveUrl(urls)
36  if len(url_list) == 0:
37  log.warning("Cannot find a valid frontier connection, will not return a Frontier cursor")
38  return None
39  else:
40  log.info(f"Will use Frontier server at {urls}")
41 
42  return FrontierCursor( urls = url_list, schema = schema)
43 
44 
45 # used by FrontierCursor
46 def replacebindvars(query, bindvars):
47  """Replaces the bound variables with the specified values,
48  disables variable binding
49  """
50  for var,val in list(bindvars.items()):
51  if query.find(":%s" % var)<0:
52  raise NameError("variable '%s' is not a bound variable in this query: %s" % (var, query) )
53  if isinstance (val, int):
54  query = query.replace(":%s" % var,"%s" % val)
55  else:
56  query = query.replace(":%s" % var,"%r" % val)
57  log.debug("Resolving bound variable '%s' with %r", var,val)
58  log.debug("Resolved query: %s", query)
59  return query
60 
61 
63  def __init__(self, urls, schema, refreshFlag=False, doDecode=True, retrieveZiplevel="zip", encoding="utf-8"):
64  self.urls = [str(x) + "/Frontier" for x in urls]#Add /Frontier to each URL
65  self.schema = schema
66  self.refreshFlag = refreshFlag
67  self.retrieveZiplevel = retrieveZiplevel
68  self.doDecode = doDecode
69  self.encoding = encoding
70 
71  def __str__(self):
72  s = "Using Frontier URL: %s\n" % self.urls
73  s += "Schema: %s\n" % self.schema
74  s += "Refresh cache: %s" % self.refreshFlag
75  return s
76 
77  def execute(self, query, bindvars={}):
78  if len(bindvars)>0:
79  query = replacebindvars(query,bindvars)
80 
81 
82  log.debug("Frontier URLs : %s", self.urls)
83  log.debug("Refresh cache : %s", self.refreshFlag)
84  log.debug("Query : %s", query)
85 
86  import base64, zlib, urllib.request, urllib.error, urllib.parse
87 
88  self.result = None
89 
90  for url in self.urls:
91  try:
92  compQuery = zlib.compress(query.encode("utf-8"),9)
93  base64Query = base64.binascii.b2a_base64(compQuery).decode("utf-8")
94  encQuery = base64Query.replace("+", ".").replace("\n","").replace("/","-").replace("=","_")
95  log.debug("Frontier Request : %s", encQuery)
96  frontierRequest="%s/type=frontier_request:1:DEFAULT&encoding=BLOB%s&p1=%s" % (url, self.retrieveZiplevel, encQuery)
97  request = urllib.request.Request(frontierRequest)
98  if self.refreshFlag:
99  request.add_header("pragma", "no-cache")
100 
101  frontierId = "TrigConfFrontier 1.0"
102  request.add_header("X-Frontier-Id", frontierId)
103 
104  queryStart = time.localtime()
105  log.debug("Query started: %s", time.strftime("%m/%d/%y %H:%M:%S %Z", queryStart))
106 
107  t1 = time.time()
108  result = urllib.request.urlopen(request,timeout=10).read().decode('utf-8')
109  t2 = time.time()
110 
111  queryEnd = time.localtime()
112  log.debug("Query ended: %s", time.strftime("%m/%d/%y %H:%M:%S %Z", queryEnd))
113  log.debug("Query time: %s [seconds]", (t2-t1))
114  log.debug("Result size: %i [seconds]", len(result))
115  self.result = result
116  self.checkResultForErrors()
117  return
118  except urllib.error.HTTPError:
119  log.warning("Problem with Frontier connection to %s trying next server", url)
120  except Exception as err:
121  log.warning("Problem with the request {0}".format(err))
122 
123  raise Exception("All servers failed")
124 
125 
126  def fetchall(self):
127  if self.doDecode: self.decodeResult()
128  return self.result
129 
131  ''' Parse the response, looking for errors '''
132  from xml.dom.minidom import parseString
133  dom = parseString(self.result)
134 
135  globalError = dom.getElementsByTagName("global_error")
136  for node in globalError:
137  raise Exception(node.getAttribute("msg"))
138 
139  qualityList = dom.getElementsByTagName("quality")
140  for node in qualityList:
141  if int(node.getAttribute("error")) > 0:
142  raise Exception(node.getAttribute("message"))
143 
144  def decodeResult(self):
145  from xml.dom.minidom import parseString
146  import base64, zlib, curses.ascii, re
147  #print ("Query result:\n", self.result)
148  dom = parseString(self.result)
149  dataList = dom.getElementsByTagName("data")
150  keepalives = 0
151  result = []
152  # Control characters represent records, but I won't bother with that now,
153  # and will simply replace those by space.
154  for data in dataList:
155  for node in data.childNodes:
156  # <keepalive /> elements may be present, combined with whitespace text
157  if node.nodeName == "keepalive":
158  # this is of type Element
159  keepalives += 1
160  continue
161  # else assume of type Text
162  if node.data.strip() == "":
163  continue
164  if keepalives > 0:
165  print (keepalives, "keepalives received\n")
166  keepalives = 0
167 
168  row = base64.decodebytes(node.data.encode())
169  if self.retrieveZiplevel != "":
170  row = zlib.decompress(row).decode("ISO-8859-1")
171 
172  #Hack to get these lines to work in python 2
173  if sys.version_info[0] < 3:
174  row = row.encode('ascii', 'xmlcharrefreplace')
175 
176  endFirstRow = row.find('\x07')
177  firstRow = row[:endFirstRow]
178  for c in firstRow:
179  if curses.ascii.isctrl(c):
180  firstRow = firstRow.replace(c, ' ')
181  fields = [x for i,x in enumerate(firstRow.split()) if i%2==0]
182  types = [x for i,x in enumerate(firstRow.split()) if i%2==1]
183  ptypes = []
184  for _f,t in zip(fields, types):
185  if t.startswith("NUMBER"):
186  if _f in ["HPR_VALUE", "L1CI_GLOBAL_JET_SCALE", "L1CI_GLOBAL_EM_SCALE"]: # float columns in run 2 DB if ",0" in t:
187  ptypes.append(float)
188  else:
189  ptypes.append(int)
190  else:
191  ptypes.append(str)
192 
193 
194  log.debug("Fields : %r", fields)
195  log.debug("DB Types : %r", types)
196  log.debug("Python Types: %r", ptypes)
197 
198  row = row[endFirstRow+1:]
199 
200  row_h = row.rstrip('\x07')
201 
202  if 'BLOB' in types:
203  row_h = re.sub("^.*?{","{",row_h)
204 
205  row_h = row_h.replace("\x07\x06",'.nNn.\x06')
206 
207  # pattern = re.compile("\x06\x00\x00\x00.",flags=re.S)
208  #replace pattern above more restrictive version, as longerstrings in the results
209  #have a size variable in the column separate that becomes visible if the string
210  #is large enough - this then broke the prevous decoding
211  pattern = re.compile("\x06\x00\x00..",flags=re.S)
212  row_h = pattern.sub('.xXx.',row_h)
213  row_h = row_h.replace("\x86", '.xXx.')
214 
215  row_h = row_h.split('.nNn.')
216  row_h = [r.split('.xXx.') for r in row_h]
217 
218  result = []
219  for r in row_h:
220  if r[0]=='': r[0:1]=[]
221  r = tuple([t(v) for t,v in zip(ptypes,r)])
222  result.append( r )
223 
224  self.result = result
225 
226 
227 
228 def testQuery(query, bindvars):
229  from TrigConfigSvc.TrigConfigSvcUtils import interpretConnection
230  connectionParameters = interpretConnection("TRIGGERDBMC")
231  cursor = getFrontierCursor( urls = connectionParameters['url'], schema = connectionParameters['schema'])
232  cursor.execute(query, bindvars)
233  log.info("Raw response:")
234  log.info(cursor.result)
235  cursor.decodeResult()
236  log.info("Decoded response:")
237  log.info(cursor.result[0][0])
238  if cursor.result[0][0] != 'MC_pp_v7':
239  return 1
240  return 0
241 
242 
243 if __name__=="__main__":
244  log = logging.getLogger( "TrigConfFrontier.py" )
245  log.setLevel(logging.DEBUG)
246 
247  dbalias = "TRIGGERDBMC"
248  query = "select distinct HPS.HPS_NAME from ATLAS_CONF_TRIGGER_RUN2_MC.HLT_PRESCALE_SET HPS where HPS.HPS_ID = :psk"
249  bindvars = { "psk": 260 }
250 
251  res = testQuery(query, bindvars) # pure python frontier query
252  sys.exit(res)
read
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)
Definition: openCoraCool.cxx:569
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
AtlasMcWeight::decode
double decode(number_type binnedWeight)
Convert weight from unsigned to double.
Definition: AtlasMcWeight.cxx:32
python.TrigConfFrontier.testQuery
def testQuery(query, bindvars)
Definition: TrigConfFrontier.py:228
vtune_athena.format
format
Definition: vtune_athena.py:14
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.TrigConfFrontier.getServerUrls
def getServerUrls(frontier_servers)
Definition: TrigConfFrontier.py:10
python.TrigConfFrontier.FrontierCursor.encoding
encoding
Definition: TrigConfFrontier.py:69
python.TrigConfFrontier.getFrontierCursor
def getFrontierCursor(urls, schema, loglevel=logging.INFO)
Definition: TrigConfFrontier.py:33
python.TrigConfFrontier.FrontierCursor.fetchall
def fetchall(self)
Definition: TrigConfFrontier.py:126
read_hist_ntuple.t
t
Definition: read_hist_ntuple.py:5
python.utils.AtlRunQueryTriggerUtils.interpretConnection
def interpretConnection(connection, debug=False, resolveAlias=True)
Definition: AtlRunQueryTriggerUtils.py:230
python.TrigConfFrontier.FrontierCursor.execute
def execute(self, query, bindvars={})
Definition: TrigConfFrontier.py:77
python.TrigConfFrontier.resolveUrl
def resolveUrl(url)
Definition: TrigConfFrontier.py:21
python.TrigConfFrontier.FrontierCursor.decodeResult
def decodeResult(self)
Definition: TrigConfFrontier.py:144
python.TrigConfFrontier.FrontierCursor.result
result
Definition: TrigConfFrontier.py:88
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.TrigConfFrontier.FrontierCursor.retrieveZiplevel
retrieveZiplevel
Definition: TrigConfFrontier.py:67
python.TrigConfFrontier.FrontierCursor.refreshFlag
refreshFlag
Definition: TrigConfFrontier.py:66
DiTauRecTools::parseString
std::vector< TString > parseString(const TString &str, const TString &delim=",")
Definition: Reconstruction/DiTauRecTools/Root/HelperFunctions.cxx:19
python.TrigConfFrontier.FrontierCursor.urls
urls
Definition: TrigConfFrontier.py:64
python.TrigConfFrontier.FrontierCursor.__init__
def __init__(self, urls, schema, refreshFlag=False, doDecode=True, retrieveZiplevel="zip", encoding="utf-8")
Definition: TrigConfFrontier.py:63
python.TrigConfFrontier.FrontierCursor.schema
schema
Definition: TrigConfFrontier.py:65
pickleTool.object
object
Definition: pickleTool.py:30
str
Definition: BTagTrackIpAccessor.cxx:11
python.TrigConfFrontier.FrontierCursor.checkResultForErrors
def checkResultForErrors(self)
Definition: TrigConfFrontier.py:130
python.TrigConfFrontier.FrontierCursor.doDecode
doDecode
Definition: TrigConfFrontier.py:68
python.TrigConfFrontier.replacebindvars
def replacebindvars(query, bindvars)
Definition: TrigConfFrontier.py:46
python.TrigConfFrontier.FrontierCursor.__str__
def __str__(self)
Definition: TrigConfFrontier.py:71
python.TrigConfFrontier.FrontierCursor
Definition: TrigConfFrontier.py:62