ATLAS Offline Software
POOLtoEI_Skeleton.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 # @file POOLtoEI_Skeleton.py
4 # @purpose Configure the POOLtoEI transformation using the ComponentAccumulator
5 # @author Javier Sanchez
6 # @date March 2024
7 
8 from AthenaCommon import JobProperties
9 from PyJobTransforms.TransformUtils import (processPreExec, processPreInclude,
10  processPostExec, processPostInclude)
11 
12 # force no legacy job properties
13 JobProperties.jobPropertiesDisallowed = True
14 
15 
16 def configurePOOL2EIglobals(runArgs, flags):
17  """ Configure POOL2EI global vars """
18 
19  class AttributeDict(dict):
20  def __getattr__(self, attr):
21  return self[attr]
22 
23  def __setattr__(self, attr, value):
24  self[attr] = value
25 
26  job = AttributeDict()
27 
28  from AthenaCommon.Logging import logging
29  eilog = logging.getLogger('pool_to_ei')
30 
31  # Input
32  inputFileGiven = 0
33  for filetype in ('POOL', 'AOD', 'ESD', 'EVNT', 'HITS', 'RDO', 'AOD_MRG'):
34  if hasattr(runArgs, "input" + filetype + "File"):
35  In = getattr(runArgs, "input" + filetype + "File")
36  if type(In) is not list:
37  In = [In]
38  flags.Input.Files = In
39  inputFileGiven += 1
40 
41  if inputFileGiven == 0:
42  raise RuntimeError("No input file given")
43  if inputFileGiven > 1:
44  raise RuntimeError("Only one input file format is allowed")
45 
46  def getArgVal(args, key, defaultValue):
47  if hasattr(args, key):
48  return getattr(args, key)
49  else:
50  return defaultValue
51 
52  # Output
53  import os
54  job.Out = getArgVal(runArgs, "outputEIFile",
55  'pool2ei.{:08d}.spb'.format(os.getpid()))
56 
57  # options
58  job.DoProvenanceRef = getArgVal(runArgs, "provenance", True)
59  job.DoTriggerInfo = getArgVal(runArgs, 'trigger', True)
60  job.SendToBroker = getArgVal(runArgs, "sendtobroker", False)
61  job.EiDsName = getArgVal(runArgs, "eidsname", None)
62  job.TestBrk = getArgVal(runArgs, "testbrk", False)
63  job.EiFmt = getArgVal(runArgs, "eifmt", 0)
64 
65  # Tier0 job identification
66  job.TaskID = getArgVal(runArgs, "_taskid", None)
67  job.JobID = getArgVal(runArgs, "_jobid", None)
68  job.AttemptNumber = getArgVal(runArgs, "_attempt", None)
69 
70  def findFirstKeyforValue(dict, value):
71  """ Look for value in dict and return the first key or None if not found """
72  k = next((k for k, v in dict.items() if v == value), None)
73  return k
74 
75  # find key for 'EventStreamInfo'
76  EventStreamInfo_key = findFirstKeyforValue(
77  flags.Input.MetadataItems, 'EventStreamInfo')
78 
79  metadata_items = flags.Input.MetadataItems
80  job.meta_hlt_hltconfigkeys = (
81  '/TRIGGER/HLT/HltConfigKeys' in metadata_items)
82  job.meta_hlt_prescalekey = ('/TRIGGER/HLT/PrescaleKey' in metadata_items)
83  job.meta_lvl1_lvl1configkey = (
84  '/TRIGGER/LVL1/Lvl1ConfigKey' in metadata_items)
85  job.meta_hlt_menu = ('/TRIGGER/HLT/Menu' in metadata_items)
86  job.meta_lvl1_menu = ('/TRIGGER/LVL1/Menu' in metadata_items)
87  job.meta_triggermenu = ('TriggerMenu' in metadata_items)
88  job.meta_triggermenujson_hlt = ('TriggerMenuJson_HLT' in metadata_items)
89  job.meta_triggermenujson_l1 = ('TriggerMenuJson_L1' in metadata_items)
90 
91  eilog.info("meta_hlt_hltconfigkeys: {}".format(job.meta_hlt_hltconfigkeys))
92  eilog.info("meta_hlt_prescalekey: {}".format(job.meta_hlt_prescalekey))
93  eilog.info("meta_lvl1_lvl1configkey: {}".format(
94  job.meta_lvl1_lvl1configkey))
95  eilog.info("meta_hlt_menu: {}".format(job.meta_hlt_menu))
96  eilog.info("meta_lvl1_menu: {}".format(job.meta_lvl1_menu))
97  eilog.info("meta_triggermenu: {}".format(job.meta_triggermenu))
98  eilog.info("meta_triggermenujson_hlt: {}".format(
99  job.meta_triggermenujson_hlt))
100  eilog.info("meta_triggermenujson_l1: {}".format(
101  job.meta_triggermenujson_l1))
102 
103  collections = flags.Input.TypedCollections
104  job.item_eventinfo = ('EventInfo#EventInfo' in collections
105  or 'EventInfo#McEventInfo' in collections)
106  job.item_eventinfoBS = ('EventInfo#ByteStreamEventInfo' in collections)
107  job.item_xaod_eventinfo = ('xAOD::EventInfo#EventInfo' in collections)
108  job.item_xaod_TrigConfKeys = (
109  'xAOD::TrigConfKeys#TrigConfKeys' in collections)
110  job.item_xaod_TrigDecision = (
111  'xAOD::TrigDecision#xTrigDecision' in collections)
112 
113  if not (job.item_eventinfo or job.item_xaod_eventinfo or job.item_eventinfoBS):
114  # EventInfo should exist
115  job.item_eventinfo = True
116 
117  eilog.info("item_eventinfo: {}".format(job.item_eventinfo))
118  eilog.info("item_eventinfoBS: {}".format(job.item_eventinfoBS))
119  eilog.info("item_xaod_eventinfo: {}".format(job.item_xaod_eventinfo))
120  eilog.info("item_xaod_TrigConfKeys: {}".format(job.item_xaod_TrigConfKeys))
121  eilog.info("item_xaod_TrigDecision: {}".format(job.item_xaod_TrigDecision))
122 
123  job.projectName = flags.Input.ProjectName
124 
125  # if EVNT, disable trigger processing
126  if job.DoTriggerInfo:
127  try:
128  if (EventStreamInfo_key is not None and ('StreamEVGEN' in flags.Input.ProcessingTags or 'StreamEVNT' in flags.Input.ProcessingTags)):
129  eilog.info("Disable trigger processing for EVNT files")
130  job.DoTriggerInfo = False
131 
132  if (EventStreamInfo_key is not None and flags.Input.isMC):
133  if not (job.meta_hlt_hltconfigkeys or job.item_xaod_TrigConfKeys):
134  eilog.info("Disable trigger processing for MC files "
135  "with no trigger inside")
136  job.DoTriggerInfo = False
137  except Exception:
138  pass
139 
140  return job
141 
142 
143 def fromRunArgs(runArgs):
144  """ Configure and run the transformation """
145 
146  from AthenaCommon.Logging import logging
147  from AthenaCommon.Constants import INFO
148 
149  eilog = logging.getLogger('pool_to_ei')
150  eilog.info('****************** STARTING POOL->EI MAKING *****************')
151 
152  # configure flags
153 
154  from AthenaConfiguration.AllConfigFlags import initConfigFlags
155  flags = initConfigFlags()
156 
157  from PyJobTransforms.CommonRunArgsToFlags import commonRunArgsToFlags
158  commonRunArgsToFlags(runArgs, flags)
159 
160  # process pre-include/exec
161  processPreInclude(runArgs, flags)
162  processPreExec(runArgs, flags)
163 
164  # To respect --athenaopts
165  flags.fillFromArgs()
166 
167  # configure job globals form runArgs and flags
168  job = configurePOOL2EIglobals(runArgs, flags)
169 
170  # Lock flags
171  flags.lock()
172 
173  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
174  cfg = MainServicesCfg(flags)
175  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
176  cfg.merge(PoolReadCfg(flags))
177  from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
178  cfg.merge(MetaDataSvcCfg(flags))
179 
180  # Load these objects from StoreGate
181  loadFromSG = []
182  if job.item_eventinfo:
183  if flags.Input.isMC:
184  loadFromSG.append(('EventInfo', 'StoreGateSvc+McEventInfo'))
185  else:
186  loadFromSG.append(('EventInfo', 'StoreGateSvc+EventInfo'))
187  if job.item_eventinfoBS:
188  loadFromSG.append(('EventInfo', 'StoreGateSvc+ByteStreamEventInfo'))
189  if job.item_xaod_eventinfo:
190  loadFromSG.append(('xAOD::EventInfo', 'StoreGateSvc+EventInfo'))
191  if job.item_xaod_TrigConfKeys:
192  loadFromSG.append(('xAOD::TrigConfKeys', 'StoreGateSvc+TrigConfKeys'))
193  if job.item_xaod_TrigDecision:
194  loadFromSG.append(('xAOD::TrigDecision', 'StoreGateSvc+xTrigDecision'))
195  loadFromSG.append(('DataHeader', 'StoreGateSvc+EventSelector'))
196 
197  from SGComps.SGInputLoaderConfig import SGInputLoaderCfg
198  cfg.merge(SGInputLoaderCfg(flags, loadFromSG))
199 
200  if job.item_eventinfoBS:
201  # instruct POOL2EI to read EventInfo
202  job.item_eventinfo = True
203 
204  # algorithm
205  from EventIndexProducer.POOL2EI_Lib import POOL2EI
206  pool2ei = POOL2EI('pool2ei', OutputLevel=INFO, **job)
207  cfg.addEventAlgo(pool2ei)
208 
209  # service
210  from EventIndexProducer.POOL2EI_Lib import POOL2EISvc
211  pool2eisvc = POOL2EISvc(algo=pool2ei)
212  cfg.addService(pool2eisvc, create=True)
213 
214  # Post-include
215  processPostInclude(runArgs, flags, cfg)
216 
217  # Post-exec
218  processPostExec(runArgs, flags, cfg)
219 
220  if flags.Exec.OutputLevel <= INFO:
221  cfg.printConfig()
222 
223  # Run the final accumulator
224  sc = cfg.run()
225 
226  import sys
227  sys.exit(sc.isFailure())
python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.POOLtoEI_Skeleton.fromRunArgs
def fromRunArgs(runArgs)
Definition: POOLtoEI_Skeleton.py:143
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
fillPileUpNoiseLumi.next
next
Definition: fillPileUpNoiseLumi.py:52
SGInputLoaderConfig.SGInputLoaderCfg
def SGInputLoaderCfg(flags, Load=None, **kwargs)
Definition: SGInputLoaderConfig.py:7
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:260
Constants
some useful constants -------------------------------------------------—
python.CommonRunArgsToFlags.commonRunArgsToFlags
def commonRunArgsToFlags(runArgs, configFlags)
Definition: CommonRunArgsToFlags.py:12
python.POOLtoEI_Skeleton.configurePOOL2EIglobals
def configurePOOL2EIglobals(runArgs, flags)
Definition: POOLtoEI_Skeleton.py:16
python.MetaDataSvcConfig.MetaDataSvcCfg
def MetaDataSvcCfg(flags, toolNames=[], tools=[])
Definition: MetaDataSvcConfig.py:6
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
python.PyAthenaComps.__setattr__
__setattr__
Definition: PyAthenaComps.py:39
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69