ATLAS Offline Software
L1TopoByteStreamConfig.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2 '''
3 Functions creating ComponentAccumulator with ByteStream converters for L1Topo objects
4 '''
5 
6 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
7 from TriggerJobOpts.TriggerByteStreamConfig import ByteStreamReadCfg
8 
9 def L1TopoPhase1ByteStreamToolCfg(flags, name, writeBS=False):
10  from libpyeformat_helper import SourceIdentifier, SubDetector
11  from AthenaConfiguration.ComponentFactory import CompFactory
12 
13  acc = ComponentAccumulator()
14  tool = CompFactory.L1TopoPhase1ByteStreamTool(name)
15  moduleids = [0x1800]
16  tool.ROBIDs = [int(SourceIdentifier(SubDetector.TDAQ_CALO_TOPO_PROC, moduleid)) for moduleid in moduleids]
17 
18  if writeBS:
19  tool.L1TopoPhase1RAWDataReadContainer = "L1_Phase1L1TopoRAWData"
20  tool.L1TopoPhase1RAWDataWriteContainer = ""
21  else:
22  tool.L1TopoPhase1RAWDataReadContainer = ""
23  tool.L1TopoPhase1RAWDataWriteContainer = "L1_Phase1L1TopoRAWData"
24 
25  acc.setPrivateTools(tool)
26  return acc
27 
29  typeNamesToDecode = ["L1TopoRDOCollection/L1TopoRDOCollection",
30  "SG::AuxVectorBase/L1TopoRDOCollection"]
31  return ByteStreamReadCfg(flags, typeNamesToDecode)
32 
33 
35  typeNamesToDecode = ["xAOD::L1TopoRawDataContainer/L1TopoRawData",
36  "xAOD::L1TopoRawDataAuxContainer/L1TopoRawDataAux."]
37  return ByteStreamReadCfg(flags, typeNamesToDecode)
38 
39 
41  acc = ComponentAccumulator()
42  acc.merge(L1TopoRDOCollectionBSCnvCfg(flags))
43  acc.merge(L1TopoRawDataContainerBSCnvCfg(flags))
44  return acc
45 
46 
47 if __name__ == '__main__':
48  '''Run a functional unit test if module is executed'''
49 
50  from AthenaConfiguration.AllConfigFlags import initConfigFlags
51  from AthenaConfiguration.TestDefaults import defaultTestFiles
52  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
53  from AthenaCommon.Constants import DEBUG
54 
55  flags = initConfigFlags()
56  flags.Exec.OutputLevel = DEBUG # noqa: ATL900 - fine for short unit test
57  flags.Input.Files = defaultTestFiles.RAW_RUN2
58  flags.lock()
59 
60  acc = MainServicesCfg(flags)
61  acc.merge(L1TopoByteStreamCfg(flags))
62  acc.printConfig()
63 
64  with open('L1TopoByteStreamConfig.pkl', 'wb') as pkl:
65  acc.store(pkl)
66 
67  acc.run(10)
L1TopoByteStreamConfig.L1TopoPhase1ByteStreamToolCfg
def L1TopoPhase1ByteStreamToolCfg(flags, name, writeBS=False)
Definition: L1TopoByteStreamConfig.py:9
L1TopoByteStreamConfig.L1TopoByteStreamCfg
def L1TopoByteStreamCfg(flags)
Definition: L1TopoByteStreamConfig.py:40
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.ByteStreamConfig.ByteStreamReadCfg
def ByteStreamReadCfg(flags, type_names=None)
Definition: Event/ByteStreamCnvSvc/python/ByteStreamConfig.py:25
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:256
Constants
some useful constants -------------------------------------------------—
L1TopoByteStreamConfig.L1TopoRawDataContainerBSCnvCfg
def L1TopoRawDataContainerBSCnvCfg(flags)
Definition: L1TopoByteStreamConfig.py:34
L1TopoByteStreamConfig.L1TopoRDOCollectionBSCnvCfg
def L1TopoRDOCollectionBSCnvCfg(flags)
Definition: L1TopoByteStreamConfig.py:28
Trk::open
@ open
Definition: BinningType.h:40
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19