ATLAS Offline Software
Loading...
Searching...
No Matches
L1TopoByteStreamConfig.py
Go to the documentation of this file.
1# Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2'''
3Functions creating ComponentAccumulator with ByteStream converters for L1Topo objects
4'''
5
6from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
7from TriggerJobOpts.TriggerByteStreamConfig import ByteStreamReadCfg
8
9def L1TopoPhase1ByteStreamToolCfg(flags, name, writeBS=False):
10 from libpyeformat_helper import SourceIdentifier, SubDetector
11 from AthenaConfiguration.ComponentFactory import CompFactory
12
13 acc = ComponentAccumulator()
14 tool = CompFactory.L1TopoPhase1ByteStreamTool(name)
15 moduleids = [0x1800]
16 tool.ROBIDs = [int(SourceIdentifier(SubDetector.TDAQ_CALO_TOPO_PROC, moduleid)) for moduleid in moduleids]
17
18 if writeBS:
19 tool.L1TopoPhase1RAWDataReadContainer = "L1_Phase1L1TopoRAWData"
20 tool.L1TopoPhase1RAWDataWriteContainer = ""
21 else:
22 tool.L1TopoPhase1RAWDataReadContainer = ""
23 tool.L1TopoPhase1RAWDataWriteContainer = "L1_Phase1L1TopoRAWData"
24
25 acc.setPrivateTools(tool)
26 return acc
27
29 typeNamesToDecode = ["L1TopoRDOCollection/L1TopoRDOCollection",
30 "SG::AuxVectorBase/L1TopoRDOCollection"]
31 return ByteStreamReadCfg(flags, typeNamesToDecode)
32
33
35 typeNamesToDecode = ["xAOD::L1TopoRawDataContainer/L1TopoRawData",
36 "xAOD::L1TopoRawDataAuxContainer/L1TopoRawDataAux."]
37 return ByteStreamReadCfg(flags, typeNamesToDecode)
38
39
41 acc = ComponentAccumulator()
42 acc.merge(L1TopoRDOCollectionBSCnvCfg(flags))
43 acc.merge(L1TopoRawDataContainerBSCnvCfg(flags))
44 return acc
45
46
47if __name__ == '__main__':
48 '''Run a functional unit test if module is executed'''
49
50 from AthenaConfiguration.AllConfigFlags import initConfigFlags
51 from AthenaConfiguration.TestDefaults import defaultTestFiles
52 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
53 from AthenaCommon.Constants import DEBUG
54
55 flags = initConfigFlags()
56 flags.Exec.OutputLevel = DEBUG # noqa: ATL900 - fine for short unit test
57 flags.Input.Files = defaultTestFiles.RAW_RUN2
58 flags.lock()
59
60 acc = MainServicesCfg(flags)
61 acc.merge(L1TopoByteStreamCfg(flags))
62 acc.printConfig()
63
64 with open('L1TopoByteStreamConfig.pkl', 'wb') as pkl:
65 acc.store(pkl)
66
67 acc.run(10)
L1TopoPhase1ByteStreamToolCfg(flags, name, writeBS=False)