ATLAS Offline Software
ConfigUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 import json
4 from AthenaCommon.Utils.unixtools import find_datafile
5 from AthenaCommon.Logging import logging
6 
7 def getTrkAnaDicts( flags, input_file, unpackChains=False ):
8  '''
9  utility function to retrieve the flag dictionary
10  for every TrackAnalysis from an input JSON file
11  '''
12  analysesDict = {}
13 
14 
15  if input_file == "Default":
16  return analysesDict
17 
18 
19  dataPath = find_datafile( input_file )
20  if dataPath is None and input_file != "Default":
21  raise Exception(f"Input file with analyses definition: {input_file} could not be found, for files given with absolute path use ./ prefix")
22  if dataPath is None:
23  return analysesDict
24 
25 
26  analysesDictTmp = {}
27  with open( dataPath, "r" ) as input_json_file:
28  analysesDictTmp = json.load( input_json_file )
29 
30 
31  if analysesDictTmp:
32  for trkAnaName, trkAnaDict in analysesDictTmp.items():
33 
34  analysesDict.update( { trkAnaName : trkAnaDict } )
35 
36  analysesDict[trkAnaName]["anaTag"] = "_" + trkAnaName
37 
38  if ( "SubFolder" not in analysesDict[trkAnaName].keys() or
39  analysesDict[trkAnaName]["SubFolder"] == "" ) :
40  analysesDict[trkAnaName]["SubFolder"] = trkAnaName
41  analysesDict[trkAnaName]["SubFolder"] += "/"
42 
43  if "ChainNames" in analysesDict[trkAnaName]:
44  fullChainList = getChainList( flags, analysesDict[trkAnaName]["ChainNames"] )
45  analysesDict[trkAnaName]["ChainNames"] = fullChainList
46 
47  return unpackTrkAnaDicts( analysesDict ) if unpackChains else analysesDict
48 
49 
50 def unpackTrkAnaDicts( analysesDictIn ):
51  '''
52  utility function to define a separate TrackAnalysis
53  for each configured trigger chain
54  '''
55  if not analysesDictIn : return analysesDictIn
56 
57  analysesDictOut = {}
58  for trkAnaName, trkAnaDict in analysesDictIn.items():
59  if "ChainNames" in trkAnaDict:
60  chainList = trkAnaDict["ChainNames"]
61  for chain in chainList:
62  trkAnaName_new = trkAnaName + "_" + chain
63  trkAnaDict_new = dict( trkAnaDict )
64  trkAnaDict_new["anaTag"] = trkAnaDict["anaTag"] + "_" + chain
65  trkAnaDict_new["ChainNames"] = [ chain ]
66  analysesDictOut.update( { trkAnaName_new : trkAnaDict_new } )
67  else:
68  analysesDictOut.update( { trkAnaName : trkAnaDict } )
69 
70  return analysesDictOut
71 
72 
73 
74 def getChainList( flags, regexChainList=[] ):
75  '''
76  utility function to retrieve full list of
77  configured trigger chains matching
78  the passed regex list of chains
79  '''
80  if not regexChainList: return regexChainList
81 
82  if not flags.locked():
83  flags_tmp = flags.clone()
84  flags_tmp.lock()
85  flags = flags_tmp
86 
87  from TrigConfigSvc.TriggerConfigAccess import getHLTMenuAccess
88  chainsMenu = getHLTMenuAccess( flags )
89 
90  import re
91  configChains = []
92  for regexChain in regexChainList:
93  for item in chainsMenu:
94  chains = re.findall( regexChain, item )
95  for chain in chains:
96  if chain is not None and chain == item:
97  configChains.append( chain )
98 
99  return configChains
100 
101 
102 def getPlotsDefList( flags ):
103  '''
104  Open json files and load all merged contents
105  in a dictionary, which is later converted to a
106  list of strings, each to be parsed in a
107  (flattened) json format
108  '''
109  log = logging.getLogger( "getPlotsDefList" )
110 
111  # open the list of json files
112  log.debug( "plotsDefFileList : %s", flags.PhysVal.IDTPM.plotsDefFileList )
113  listPath = find_datafile( flags.PhysVal.IDTPM.plotsDefFileList )
114  if listPath is None:
115  log.error( "plotsDefFileList not found" )
116  return None
117 
118  plotsDefFileNames = []
119  with open( listPath, "r" ) as input_flist :
120  plotsDefFileNames = input_flist.read().splitlines()
121 
122  # creating the basic histogrm definition dictionary
123  plotsDefDict = {}
124 
125  for plotsDefFileName in plotsDefFileNames :
126  dataPath = find_datafile( plotsDefFileName )
127  log.debug( "Reading input plots definitions : %s", dataPath )
128  if dataPath is None:
129  log.error( "plotsDefFile %s not found", plotsDefFileName )
130  return None
131 
132  with open( dataPath, "r" ) as input_json_file :
133  plotsDefDict.update( json.load( input_json_file ) )
134 
135  # Turn all histo definitions into a list of strings
136  # each string has a flattened json format
137  plotsDefStrList_v1 = []
138  for plotName, plotDict in plotsDefDict.items():
139  newPlotDict = plotDict.copy()
140  newPlotDict[ "name" ] = plotName
141 
142  # flatten json histo dict
143  plotDictFlat = flatten_json( newPlotDict )
144 
145  # Turn json into string
146  plotDefStr = str( json.dumps( plotDictFlat ) )
147 
148  # append to list
149  plotsDefStrList_v1.append( plotDefStr )
150 
151  # Replace standard common fields (e.g. &ETAMAX)
152  # with corresponding values (read from default json)
153  commonValuesDict = {}
154  commonValuesPath = find_datafile( flags.PhysVal.IDTPM.plotsCommonValuesFile )
155  if commonValuesPath is None :
156  log.error( "plotsCommonValuesFile not found" )
157  return None
158 
159  with open( commonValuesPath, "r" ) as input_commonValues :
160  commonValuesDict.update( json.load( input_commonValues ) )
161 
162  plotsDefStrList_v2 = []
163  for plotDefStr in plotsDefStrList_v1 :
164  newPlotDefStr = plotDefStr
165  if commonValuesDict :
166  for key, value in commonValuesDict.items() :
167  plotDefStr_tmp = newPlotDefStr.replace( "$"+key, value[0] )
168  newPlotDefStr = plotDefStr_tmp
169  plotsDefStrList_v2.append( newPlotDefStr )
170 
171  # Now expand the list to account for all required track types
172  trkLabels = [
173  getLabel( flags, flags.PhysVal.IDTPM.currentTrkAna.TestType ),
174  getLabel( flags, flags.PhysVal.IDTPM.currentTrkAna.RefType )
175  ]
176 
177  plotsDefStrList = []
178  for plotsDefStr in plotsDefStrList_v2 :
179  if ( "$TRKTAG" not in plotsDefStr ) and ( "$TRKTYPE" not in plotsDefStr ) :
180  plotsDefStrList.append( plotsDefStr )
181  continue
182  for trkLabel in trkLabels :
183  newPlotsDefStr = plotsDefStr.replace( "$TRKTYPE", trkLabel[0] ).replace( "$TRKTAG", trkLabel[1] )
184  plotsDefStrList.append( newPlotsDefStr )
185 
186  return plotsDefStrList
187 
188 
189 def getLabel( flags, key ) :
190  if key == "Offline" and flags.PhysVal.IDTPM.currentTrkAna.SelectOfflineObject:
191  if "Truth" not in flags.PhysVal.IDTPM.currentTrkAna.SelectOfflineObject:
192  key += flags.PhysVal.IDTPM.currentTrkAna.SelectOfflineObject
193  trkLabelsDict = {
194  "EFTrigger" : [ "EF Trigger track", "eftrig" ],
195  "Trigger" : [ "Trigger track", "trig" ],
196  "Offline" : [ "Offline track", "offl" ],
197  "OfflineElectron" : [ "Offline e^{#pm} track", "offEle" ],
198  "OfflineMuon" : [ "Offline #mu^{#pm} track", "offMu" ],
199  "OfflineTau" : [ "Offline #tau^{#pm} track", "offTau" ],
200  "Truth" : [ "Truth particle", "truth" ],
201  "TruthElectron" : [ "Truth e^{#pm}", "truthEle" ],
202  "TruthMuon" : [ "Truth #mu^{#pm}", "truthMu" ],
203  "TruthTau" : [ "Truth #tau^{#pm}", "truthEle" ],
204  }
205  return trkLabelsDict[ key ]
206 
207 
208 def getTag( flags, key ) :
209  labels = getLabel( flags, key )
210  return labels[1]
211 
212 
213 def flatten_json( y ) :
214  out = {}
215  def flatten(x, name=''):
216  # If the Nested key-value pair is of dict type
217  if type(x) is dict:
218  for a in x:
219  flatten(x[a], name + a + '_')
220  # If the Nested key-value pair is of list type
221  elif type(x) is list:
222  i = 0
223  for a in x:
224  flatten(a, name + str(i) + '_')
225  i += 1
226  else:
227  out[name[:-1]] = x
228  flatten(y)
229  return out
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
ConfigUtils.unpackTrkAnaDicts
def unpackTrkAnaDicts(analysesDictIn)
Definition: ConfigUtils.py:50
ConfigUtils.flatten_json
def flatten_json(y)
Definition: ConfigUtils.py:213
ConfigUtils.getTrkAnaDicts
def getTrkAnaDicts(flags, input_file, unpackChains=False)
Definition: ConfigUtils.py:7
ConfigUtils.getPlotsDefList
def getPlotsDefList(flags)
Definition: ConfigUtils.py:102
ConfigUtils.getLabel
def getLabel(flags, key)
Definition: ConfigUtils.py:189
python.TriggerConfigAccess.getHLTMenuAccess
HLTMenuAccess getHLTMenuAccess(flags=None)
Definition: TriggerConfigAccess.py:196
ConfigUtils.getTag
def getTag(flags, key)
Definition: ConfigUtils.py:208
Trk::open
@ open
Definition: BinningType.h:40
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
str
Definition: BTagTrackIpAccessor.cxx:11
python.Bindings.keys
keys
Definition: Control/AthenaPython/python/Bindings.py:790
python.Utils.unixtools.find_datafile
def find_datafile(fname, pathlist=None, access=os.R_OK)
pathresolver-like helper function --------------------------------------—
Definition: unixtools.py:67
ConfigUtils.getChainList
def getChainList(flags, regexChainList=[])
Definition: ConfigUtils.py:74