ATLAS Offline Software
han.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
2 
3 from cppyy.gbl import dqi
4 import ROOT
5 import re
6 import os
7 import string
8 from typing import List, Dict, Tuple, Optional, Iterable
9 import logging
10 log = logging.getLogger('DataQualityInterfaces')
11 
12 def logLevel(level: str='INFO') -> None:
13  log.setLevel(level)
14 
15 
16 def copyMetadata(newgroup: dqi.HanConfigAssessor, oldgroup: dqi.HanConfigAssessor,
17  input: str=None, algrefname: str=None) -> None:
18  """Copy the configuration of an old algorithm to a new one, but without the
19  tree structure"""
20  newgroup.SetAlgName(oldgroup.GetAlgName())
21  newgroup.SetAlgLibName(oldgroup.GetAlgLibName())
22  if algrefname is None:
23  newgroup.SetAlgRefName(oldgroup.GetAlgRefName())
24  else:
25  newgroup.SetAlgRefName(algrefname)
26  for par in oldgroup.GetAllAlgPars():
27  newgroup.AddAlgPar(par)
28  for par in oldgroup.GetAllAlgStrPars():
29  newgroup.AddAlgStrPar(par)
30  for lim in oldgroup.GetAllAlgLimits():
31  newgroup.AddAlgLimit(lim)
32  for ann in oldgroup.GetAllAnnotations():
33  if input is not None and ann.GetName() == 'inputname':
34  ann.SetValue(input)
35  newgroup.AddAnnotation(ann)
36  newgroup.SetWeight(oldgroup.GetWeight())
37 
38 
39 def count_slashes(itr: Iterable[Tuple[str, dqi.HanConfigGroup]]) -> Dict[int, List[Tuple[str, dqi.HanConfigGroup]]]:
40  from collections import defaultdict
41  rv = defaultdict(list)
42  for s, g in itr:
43  rv[s.count('/')].append((s,g))
44  return rv
45 
46 
47 def Initialize( configName: str, inputFileName: str, prefix: str ) -> Optional[dqi.HanConfigGroup]:
48  """ Handle everything in Python, testing only """
49 
50  m_config = ROOT.TFile.Open( configName, "READ" )
51  if not m_config:
52  log.error(f"HanConfig::Initialize() cannot open file \"{configName}\"\n")
53  return None
54 
55  key = m_config.FindKey("top_level")
56  if not key:
57  log.error(f"HanConfig::Initialize() cannot find configuration in file \"{configName}\"\n")
58  return None
59 
60  top_level = key.ReadObj()
61  if not isinstance(top_level, dqi.HanConfigGroup):
62  log.error(f"HanConfig::Initialize() cannot find configuration in file \"{configName}\"\n")
63  return None
64 
65  inputFile = ROOT.TFile.Open(inputFileName, 'READ')
66 
67  return FixRegion(m_config, top_level, inputFile.Get(prefix))
68 
69 
70 def FixRegion(config: ROOT.TDirectory, top_level: dqi.HanConfigGroup, td: ROOT.TDirectory) -> dqi.HanConfigGroup:
71  """Main code to translate the configuration given by 'top_level' into the
72  final configuration by expanding the regexes using the objects in 'td'"""
73  log.info('Translating regexes...')
74  HanConfigGroup = dqi.HanConfigGroup
75 
76  refmapcache = {} # Avoid repeated lookups of reference TMaps
77  mapping = {}
78  mapping_regex = {}
79  mapping_groups = {'top_level': top_level}
80  mapping_assessors_final = {}
81  mapping_groups_final: Dict[str, dqi.HanConfigGroup] = {}
82  iterate_hcfg(top_level, mapping, mapping_regex, mapping_groups)
83  mapping_assessors_final.update(mapping)
84  for path, g in mapping_groups.items():
85  newgroup = HanConfigGroup()
86  newgroup.SetName(g.GetName())
87  newgroup.SetPathName(path)
88  copyMetadata(newgroup, g)
89  mapping_groups_final[path] = newgroup
90 
91 
92  l: List[str] = []
93  iterate_objs(td, l, td.GetPath())
94  for path, (a, p) in mapping_regex.items():
95  pre = re.compile(p)
96  for h in l:
97  m = pre.fullmatch(h)
98  if m:
99  log.debug(f'match {p} with {h}')
100  tokenized_path = path.split('/')
101  orig_fullpath = []
102  new_fullpath = []
103  for tok in tokenized_path[:-1]:
104  formatted_tok = string.Template(tok).substitute(m.groupdict())
105  orig_fullpath.append(tok)
106  new_fullpath.append(formatted_tok)
107  orig = '/'.join(orig_fullpath)
108  target = '/'.join(new_fullpath)
109  if target not in mapping_groups_final: # need to clone
110  log.debug(f'Need to adapt {orig} to {target}')
111  if orig in mapping_groups_final:
112  log.debug(f'Deleting {orig}')
113  del mapping_groups_final[orig]
114  newgroup = HanConfigGroup()
115  newgroup.SetName(formatted_tok)
116  newgroup.SetPathName(target)
117  oldgroup = mapping_groups[orig]
118  copyMetadata(newgroup, oldgroup)
119  mapping_groups_final[target] = newgroup
120 
121  tok = tokenized_path[-1]
122  orig_fullpath.append(tok)
123  hname = h.split('/')[-1]
124  # special case: hname@LABEL in the original token
125  hextra = '' if '@' not in tok else ('@' + tok.rsplit('@',1)[1])
126  hname += hextra
127  new_fullpath.append(hname)
128  orig = '/'.join(orig_fullpath)
129  target = '/'.join(new_fullpath)
130  log.debug(f'Map from {orig} to {target}')
131 
132  newass = dqi.HanConfigAssessor()
133  newass.SetName(h + hextra)
134  algrefname = a.GetAlgRefName()
135  # patch up reference if it's embedded in a TMap
136  if a.GetAlgRefName() != "":
137  ref = refmapcache.get(a.GetAlgRefName())
138  if ref is None:
139  ref = config.Get(a.GetAlgRefName())
140  refmapcache[a.GetAlgRefName()] = ref
141  if not ref:
142  log.error('Unable to find references for', orig)
143  else:
144  if isinstance(ref, ROOT.TMap):
145  algrefnameptr = ref.GetValue(h)
146  algrefname = algrefnameptr.GetString().Data() if algrefnameptr else ""
147  if algrefname and isinstance(config.Get(algrefname), ROOT.TMap):
148  log.error(f'Reference for {newass} is somehow still a TMap')
149  copyMetadata(newass, mapping_regex[orig][0], input=h, algrefname=algrefname)
150  mapping_assessors_final[target] = (newass, newass.GetHistPath())
151  log.debug(f'change {orig} to {target}')
152 
153  # assemble new tree
154  # need to do it in this order or structure will be lost (there is a lot of cloning going on)
155  for p, (a, _) in mapping_assessors_final.items():
156  try:
157  mapping_groups_final[os.path.dirname(p)].AddAssessor(a)
158  except Exception as e:
159  log.error(f'Unable to look up assessor parent directory. Full error follows\n{e}: path {p}, assessor {a}, assessor name {a.GetName()}')
160  # need to in reverse order of depth. First count number of slashes:
161  keydepths = count_slashes(mapping_groups_final.items())
162  for _, l2 in sorted(keydepths.items(), reverse=True):
163  for p, g in l2:
164  if log.level <= logging.DEBUG:
165  log.debug(f'Final additions for {p}, {g.GetName()}, {g.GetPathName()} into {os.path.dirname(p)}')
166  if p != 'top_level':
167  try:
168  mapping_groups_final[os.path.dirname(p)].AddGroup(g)
169  except KeyError:
170  log.error(f'Unable to attach group to parent. Details: group path {g.GetPathName()}, attempted parent is {os.path.dirname(p)}')
171 
172  for v in refmapcache.values():
173  # fully clean up reference maps, will not happen otherwise
174  if isinstance(v, ROOT.TMap):
175  v.DeleteAll()
176 
177  return mapping_groups_final['top_level']
178 
179 
180 def iterate_hcfg(group: dqi.HanConfigGroup,
181  mapping: Dict[str,Tuple[dqi.HanConfigAssessor, str]],
182  mapping_regex: Dict[str,Tuple[dqi.HanConfigAssessor, str]],
183  mapping_groups: Dict[str, dqi.HanConfigGroup]) -> None:
184  """Recurse to collect all information on DQ configuration"""
185  import os.path
186  for a in group.GetAllAssessors():
187  m = mapping_regex if a.GetIsRegex() else mapping
188  m[os.path.join(group.GetPathName(), os.path.basename(a.GetName()))] = (a, a.GetHistPath())
189  for g in group.GetAllGroups():
190  mapping_groups[g.GetPathName()] = g
191  iterate_hcfg(g, mapping, mapping_regex, mapping_groups)
192 
193 
194 def iterate_objs(thisdir: ROOT.TDirectory, l: List[str], prefix: str):
195  """Recurse to find the names of all objects"""
196  import os.path
197  # inconsistent ROOT handling of top directory name of file
198  prefix = os.path.normpath(prefix)
199  prefixlen = len(prefix) + 1
200  fixeddirname = os.path.normpath(thisdir.GetPath())[prefixlen:]
201  for k in thisdir.GetListOfKeys():
202  if k.GetClassName().startswith('TDirectory'):
203  iterate_objs(k.ReadObj(), l, prefix)
204  else:
205  if k.GetName() != 'metadata':
206  l.append(os.path.join(fixeddirname, k.GetName()))
Data
@ Data
Definition: BaseObject.h:11
han.count_slashes
Dict[int, List[Tuple[str, dqi.HanConfigGroup]]] count_slashes(Iterable[Tuple[str, dqi.HanConfigGroup]] itr)
Definition: han.py:39
han.FixRegion
dqi.HanConfigGroup FixRegion(ROOT.TDirectory config, dqi.HanConfigGroup top_level, ROOT.TDirectory td)
Definition: han.py:70
han.copyMetadata
None copyMetadata(dqi.HanConfigAssessor newgroup, dqi.HanConfigAssessor oldgroup, str input=None, str algrefname=None)
Definition: han.py:16
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
dqi::HanConfigGroup
Definition: HanConfigGroup.h:23
han.logLevel
None logLevel(str level='INFO')
Definition: han.py:12
dqi::HanConfigAssessor
Definition: HanConfigAssessor.h:37
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename T::value_type > sorted(T begin, T end)
Helper function to create a sorted vector from an unsorted one.
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
han.iterate_objs
def iterate_objs(ROOT.TDirectory thisdir, List[str] l, str prefix)
Definition: han.py:194
han.Initialize
Optional[dqi.HanConfigGroup] Initialize(str configName, str inputFileName, str prefix)
Definition: han.py:47
RCU::substitute
std::string substitute(const std::string &str, const std::string &pattern, const std::string &with)
effects: substitute all occurences of "pattern" with "with" in the string "str" returns: the substitu...
Definition: StringUtil.cxx:24
han.iterate_hcfg
None iterate_hcfg(dqi.HanConfigGroup group, Dict[str, Tuple[dqi.HanConfigAssessor, str]] mapping, Dict[str, Tuple[dqi.HanConfigAssessor, str]] mapping_regex, Dict[str, dqi.HanConfigGroup] mapping_groups)
Definition: han.py:180