ATLAS Offline Software
ConfigUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 import json
4 from AthenaCommon.Utils.unixtools import find_datafile
5 from AthenaCommon.Logging import logging
6 
7 def getTrkAnaDicts( flags ):
8  '''
9  utility function to retrieve the flag dictionary
10  for every TrackAnalysis from an input JSON file
11  '''
12  analysesDict = {}
13 
14  input_file = flags.PhysVal.IDTPM.trkAnaCfgFile
15 
16 
17  if input_file == "Default":
18  return analysesDict
19 
20 
21  dataPath = find_datafile( input_file )
22  if dataPath is None and input_file != "Default":
23  raise Exception(f"Input file with analyses definition: {input_file} could not be found, for files given with absolute path use ./ prefix")
24  if dataPath is None:
25  return analysesDict
26 
27 
28  analysesDictTmp = {}
29  with open( dataPath, "r" ) as input_json_file:
30  analysesDictTmp = json.load( input_json_file )
31 
32 
33  if analysesDictTmp:
34  for trkAnaName, trkAnaDict in analysesDictTmp.items():
35 
36  analysesDict.update( { trkAnaName : trkAnaDict } )
37 
38  analysesDict[trkAnaName]["anaTag"] = "_" + trkAnaName
39 
40  if ( "SubFolder" not in analysesDict[trkAnaName].keys() or
41  analysesDict[trkAnaName]["SubFolder"] == "" ) :
42  analysesDict[trkAnaName]["SubFolder"] = trkAnaName
43  analysesDict[trkAnaName]["SubFolder"] += "/"
44 
45  if "ChainNames" in analysesDict[trkAnaName]:
46  fullChainList = getChainList( flags, analysesDict[trkAnaName]["ChainNames"] )
47  analysesDict[trkAnaName]["ChainNames"] = fullChainList
48 
49  return unpackTrkAnaDicts( analysesDict ) if flags.PhysVal.IDTPM.unpackTrigChains else analysesDict
50 
51 
52 def unpackTrkAnaDicts( analysesDictIn ):
53  '''
54  utility function to define a separate TrackAnalysis
55  for each configured trigger chain
56  '''
57  if not analysesDictIn : return analysesDictIn
58 
59  analysesDictOut = {}
60  for trkAnaName, trkAnaDict in analysesDictIn.items():
61  if "ChainNames" in trkAnaDict:
62  chainList = trkAnaDict["ChainNames"]
63  for chain in chainList:
64  trkAnaName_new = trkAnaName + "_" + chain
65  trkAnaDict_new = dict( trkAnaDict )
66  trkAnaDict_new["anaTag"] = trkAnaDict["anaTag"] + "_" + chain
67  trkAnaDict_new["ChainNames"] = [ chain ]
68  analysesDictOut.update( { trkAnaName_new : trkAnaDict_new } )
69  else:
70  analysesDictOut.update( { trkAnaName : trkAnaDict } )
71 
72  return analysesDictOut
73 
74 
75 
76 def getChainList( flags, regexChainList=[] ):
77  '''
78  utility function to retrieve full list of
79  configured trigger chains matching
80  the passed regex list of chains
81  '''
82  if not regexChainList: return regexChainList
83 
84  if not flags.locked():
85  flags_tmp = flags.clone()
86  flags_tmp.lock()
87  flags = flags_tmp
88 
89  from TrigConfigSvc.TriggerConfigAccess import getHLTMenuAccess
90  chainsMenu = getHLTMenuAccess( flags )
91 
92  import re
93  configChains = []
94  for regexChain in regexChainList:
95  for item in chainsMenu:
96  chains = re.findall( regexChain, item )
97  for chain in chains:
98  if chain is not None and chain == item:
99  configChains.append( chain )
100 
101  return configChains
102 
103 
104 def getPlotsDefList( flags ):
105  '''
106  Open json files and load all merged contents
107  in a dictionary, which is later converted to a
108  list of strings, each to be parsed in a
109  (flattened) json format
110  '''
111  log = logging.getLogger( "getPlotsDefList" )
112 
113  # open the list of json files
114  log.debug( "plotsDefFileList : %s", flags.PhysVal.IDTPM.plotsDefFileList )
115  listPath = find_datafile( flags.PhysVal.IDTPM.plotsDefFileList )
116  if listPath is None:
117  log.error( "plotsDefFileList not found" )
118  return None
119 
120  plotsDefFileNames = []
121  with open( listPath, "r" ) as input_flist :
122  plotsDefFileNames = input_flist.read().splitlines()
123 
124  # creating the basic histogrm definition dictionary
125  plotsDefDict = {}
126 
127  for plotsDefFileName in plotsDefFileNames :
128  dataPath = find_datafile( plotsDefFileName )
129  log.debug( "Reading input plots definitions : %s", dataPath )
130  if dataPath is None:
131  log.error( "plotsDefFile %s not found", plotsDefFileName )
132  return None
133 
134  with open( dataPath, "r" ) as input_json_file :
135  plotsDefDict.update( json.load( input_json_file ) )
136 
137  # Expand plots definitions for resolutions and pulls
138  # from corresponding TH2F Helpers
139  plotsDefDict = updateResolutionPlots( plotsDefDict )
140 
141  # Turn all histo definitions into a list of strings
142  # each string has a flattened json format
143  def flatten_json( y ) :
144  out = {}
145  def flatten(x, name=''):
146  if type(x) is dict:
147  for a in x:
148  flatten(x[a], name + a + '_')
149  else:
150  out[name[:-1]] = x
151  flatten(y)
152  return out
153 
154  plotsDefStrList_v1 = []
155  for plotName, plotDict in plotsDefDict.items():
156  newPlotDict = plotDict.copy()
157  newPlotDict[ "name" ] = plotName
158 
159  # flatten json histo dict
160  plotDictFlat = flatten_json( newPlotDict )
161 
162  # Turn json into string
163  plotDefStr = str( json.dumps( plotDictFlat ) )
164 
165  # append to list
166  plotsDefStrList_v1.append( plotDefStr )
167 
168  # Replace standard common fields (e.g. &ETAMAX)
169  # with corresponding values (read from default json)
170  plotsCommonValuesFileName = flags.PhysVal.IDTPM.plotsCommonValuesFile
171  if not plotsCommonValuesFileName :
172  if flags.Detector.GeometryID :
173  plotsCommonValuesFileName = "InDetTrackPerfMon/PlotsDefCommonValues.json"
174  elif flags.Detector.GeometryITk :
175  plotsCommonValuesFileName = "InDetTrackPerfMon/PlotsDefCommonValues_ITk.json"
176  else :
177  log.error( "Could not get detector geometry for plotsCommonValuesFile" )
178  return None
179 
180  commonValuesPath = find_datafile( plotsCommonValuesFileName )
181  if commonValuesPath is None :
182  log.error( "plotsCommonValuesFile not found: %s", plotsCommonValuesFileName )
183  return None
184 
185  commonValuesDict = {}
186  with open( commonValuesPath, "r" ) as input_commonValues :
187  commonValuesDict.update( json.load( input_commonValues ) )
188 
189  plotsDefStrList_v2 = []
190  for plotDefStr in plotsDefStrList_v1 :
191  newPlotDefStr = plotDefStr
192  if commonValuesDict :
193  for key, value in commonValuesDict.items() :
194  plotDefStr_tmp = newPlotDefStr.replace( "$"+key, value[0] )
195  newPlotDefStr = plotDefStr_tmp
196  plotsDefStrList_v2.append( newPlotDefStr )
197 
198  # Now expand the list to account for all required track types
199  testLabel = getLabel( flags, flags.PhysVal.IDTPM.currentTrkAna.TestType )
200  refLabel = getLabel( flags, flags.PhysVal.IDTPM.currentTrkAna.RefType )
201  trkLabels = [ testLabel, refLabel ]
202 
203  if flags.PhysVal.IDTPM.currentTrkAna.MatchingType == "EFTruthMatch":
204  trkLabels.append( getLabel( flags, "Truth" ) )
205 
206  plotsDefStrList = []
207  for plotsDefStr in plotsDefStrList_v2 :
208  plotsDefStr = plotsDefStr.replace( "$TESTTYPE", testLabel[0] ).replace( "$TESTTAG", testLabel[1] )
209  plotsDefStr = plotsDefStr.replace( "$REFTYPE", refLabel[0] ).replace( "$REFTAG", refLabel[1] )
210  if ( "$TRKTAG" not in plotsDefStr ) and ( "$TRKTYPE" not in plotsDefStr ) :
211  plotsDefStrList.append( plotsDefStr )
212  continue
213  for trkLabel in trkLabels :
214  newPlotsDefStr = plotsDefStr.replace( "$TRKTYPE", trkLabel[0] ).replace( "$TRKTAG", trkLabel[1] )
215  if ( "$TRK2TAG" not in newPlotsDefStr ) and ( "$TRK2TYPE" not in newPlotsDefStr ) :
216  plotsDefStrList.append( newPlotsDefStr )
217  continue
218  for trk2Label in trkLabels :
219  newPlotsDefStr2 = newPlotsDefStr.replace( "$TRK2TYPE", trk2Label[0] ).replace( "$TRK2TAG", trk2Label[1] )
220  plotsDefStrList.append( newPlotsDefStr2 )
221 
222  return plotsDefStrList
223 
224 
225 def getLabel( flags, key ) :
226  if key == "Offline" and flags.PhysVal.IDTPM.currentTrkAna.SelectOfflineObject:
227  if "Truth" not in flags.PhysVal.IDTPM.currentTrkAna.SelectOfflineObject:
228  key += flags.PhysVal.IDTPM.currentTrkAna.SelectOfflineObject
229  trkLabelsDict = {
230  "EFTrigger" : [ "EF Trigger track", "eftrig" ],
231  "Trigger" : [ "Trigger track", "trig" ],
232  "Offline" : [ "Offline track", "offl" ],
233  "OfflineElectron" : [ "Offline e^{#pm} track", "offEle" ],
234  "OfflineMuon" : [ "Offline #mu^{#pm} track", "offMu" ],
235  "OfflineTau" : [ "Offline #tau^{#pm} track", "offTau" ],
236  "Truth" : [ "Truth particle", "truth" ],
237  "TruthElectron" : [ "Truth e^{#pm}", "truthEle" ],
238  "TruthMuon" : [ "Truth #mu^{#pm}", "truthMu" ],
239  "TruthTau" : [ "Truth #tau^{#pm}", "truthEle" ],
240  }
241  return trkLabelsDict[ key ]
242 
243 
244 def getTag( flags, key ) :
245  labels = getLabel( flags, key )
246  return labels[1]
247 
248 
249 def updateResolutionPlots( myPlotsDefDict ) :
250  # initialize output dict to input
251  outDict = myPlotsDefDict.copy()
252 
253  items = [ "res", "pull" ]
254  plist = [ "mean", "width" ]
255  iDict = {
256  "res" : {
257  "helper" : { "name" : "resHelper", "yTitle" : "residual" },
258  "mean" : { "name" : "resmean", "yTitle" : "bias" },
259  "width" : { "name" : "resolution", "yTitle" : "resolution" }
260  },
261  "pull" : {
262  "helper" : { "name" : "pullHelper", "yTitle" : "pull" },
263  "mean" : { "name" : "pullmean", "yTitle" : "pull mean" },
264  "width" : { "name" : "pullwidth", "yTitle" : "pull width" }
265  }
266  }
267 
268  for plotName, plotDict in myPlotsDefDict.items() :
269  # processing resHelpers and pullHelpers
270  for i in items:
271  if iDict[i]["helper"]["name"] in plotName :
272  # processing mean and width plots
273  for p in plist :
274  # Computing the derived plot (mean or width) name from helper
275  pName = plotName.replace( iDict[i]["helper"]["name"], iDict[i][p]["name"] )
276 
277  pDict = {}
278  if pName in outDict :
279  # plot definition already exists. Grabbing the original
280  pDict = outDict[ pName ]
281  else :
282  # plot definition doesn't exist.
283  # Computing and setting yAxis title from helper's yAxis'
284  # (e.g. "VAR residual [unit]" -> "VAR resolution [unit]")
285  yTitle = plotDict["yAxis"]["title"]
286  yTitle = yTitle.replace( iDict[i]["helper"]["yTitle"], iDict[i][p]["yTitle"] )
287  pDict.update( { "yAxis" : { "title" : yTitle } } )
288 
289  # forcing x-axis to be the same as helper's (in all cases) and type = TH1F
290  pDict.update( {
291  "type" : "TH1F",
292  "xAxis" : plotDict["xAxis"]
293  } )
294 
295  # update outdict
296  outDict.update( { pName : pDict } )
297 
298  return outDict
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
ConfigUtils.unpackTrkAnaDicts
def unpackTrkAnaDicts(analysesDictIn)
Definition: ConfigUtils.py:52
ConfigUtils.getTrkAnaDicts
def getTrkAnaDicts(flags)
Definition: ConfigUtils.py:7
ConfigUtils.getPlotsDefList
def getPlotsDefList(flags)
Definition: ConfigUtils.py:104
ConfigUtils.getLabel
def getLabel(flags, key)
Definition: ConfigUtils.py:225
ConfigUtils.updateResolutionPlots
def updateResolutionPlots(myPlotsDefDict)
Definition: ConfigUtils.py:249
python.TriggerConfigAccess.getHLTMenuAccess
HLTMenuAccess getHLTMenuAccess(flags=None)
Definition: TriggerConfigAccess.py:196
ConfigUtils.getTag
def getTag(flags, key)
Definition: ConfigUtils.py:244
Trk::open
@ open
Definition: BinningType.h:40
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
str
Definition: BTagTrackIpAccessor.cxx:11
python.Bindings.keys
keys
Definition: Control/AthenaPython/python/Bindings.py:798
python.Utils.unixtools.find_datafile
def find_datafile(fname, pathlist=None, access=os.R_OK)
pathresolver-like helper function --------------------------------------—
Definition: unixtools.py:67
ConfigUtils.getChainList
def getChainList(flags, regexChainList=[])
Definition: ConfigUtils.py:76