ATLAS Offline Software
Loading...
Searching...
No Matches
python.MergePool_Skeleton Namespace Reference

Functions

 fromRunArgs (runArgs)

Function Documentation

◆ fromRunArgs()

python.MergePool_Skeleton.fromRunArgs ( runArgs)
 This is the main skeleton for merging POOL files generically.
Currently it handles (D)AOD/(D)ESD files, but can be extended in the future.
That'll mostly entail configuring extra components needed for TP conversion (if any).

Definition at line 3 of file MergePool_Skeleton.py.

3def fromRunArgs(runArgs):
4
5 """ This is the main skeleton for merging POOL files generically.
6 Currently it handles (D)AOD/(D)ESD files, but can be extended in the future.
7 That'll mostly entail configuring extra components needed for TP conversion (if any).
8 """
9
10 # Setup logging
11 from AthenaCommon.Logging import logging
12 log = logging.getLogger('MergePool_Skeleton')
13 log.info('****************** STARTING MergePool MERGING *****************')
14
15 # Print arguments
16 log.info('**** Transformation run arguments')
17 log.info(str(runArgs))
18
19 # Setup configuration flags
20 log.info('**** Setting up configuration flags')
21
22 from PyJobTransforms.CommonRunArgsToFlags import commonRunArgsToFlags
23 from AthenaConfiguration.AllConfigFlags import initConfigFlags
24 flags = initConfigFlags()
25 flags.Exec.EventPrintoutInterval = 100
26 commonRunArgsToFlags(runArgs, flags)
27
28 # First let's find the input/output files
29 inputFile, outputFile = None, None
30
31 for attr in dir(runArgs):
32 if attr.startswith('input') and attr.endswith('File'):
33 inputFile = getattr(runArgs, attr)
34 elif attr.startswith('output') and attr.endswith('File'):
35 outputFile = getattr(runArgs, attr)
36
37 if not inputFile or not outputFile:
38 raise RuntimeError('Could NOT determine the input/output files!')
39
40 # Now set the input files before we attempt to read the processing tags
41 flags.Input.Files = inputFile
42
43 # Now figure out what stream type we're trying to merge
44 streamToMerge = flags.Input.ProcessingTags[0].removeprefix('Stream') if flags.Input.ProcessingTags else None
45
46 if not streamToMerge:
47 raise RuntimeError('Could NOT determine the stream type!')
48
49 # Now set the output file name and add additional flags
50 # For known formats, e.g., ESD, AOD, we already have the associated flags
51 # However, for other formats, e.g., DAOD_XYZ, we need to create the flags
52 try:
53 setattr(flags.Output, f'{streamToMerge}FileName', outputFile)
54 except RuntimeError: # If a flag doesn't exist CA throws a runtime error
55 flags.addFlag(f'Output.{streamToMerge}FileName', outputFile)
56 flags.addFlag(f'Output.doWrite{streamToMerge}', True)
57 if 'DAOD' in streamToMerge:
58 flags.Output.doWriteDAOD = True
59
60 # Setup perfmon flags from runargs
61 from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
62 setPerfmonFlagsFromRunArgs(flags, runArgs)
63
64 # Pre-include
65 from PyJobTransforms.TransformUtils import processPreExec, processPreInclude, processPostExec, processPostInclude
66 log.info('**** Processing preInclude')
67 processPreInclude(runArgs, flags)
68
69 # Pre-exec
70 log.info('**** Processing preExec')
71 processPreExec(runArgs, flags)
72
73 # To respect --athenaopts
74 log.info('**** Processing athenaopts')
75 flags.fillFromArgs()
76
77 # Lock configuration flags
78 log.info('**** Locking configuration flags')
79 flags.lock()
80
81 # Set up necessary job components
82 log.info('**** Setting up job components')
83
84 # Main services
85 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
86 cfg = MainServicesCfg(flags)
87
88 # Input reading
89 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
90 cfg.merge(PoolReadCfg(flags))
91
92 # Output writing
93
94 # Configure the output stream
95 from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg, outputStreamName
96 cfg.merge(OutputStreamCfg(flags, streamToMerge, takeItemsFromInput = True, extendProvenanceRecord = False))
97 Stream = cfg.getEventAlgo(outputStreamName(streamToMerge))
98 Stream.ForceRead = True
99 # Add in-file MetaData
100 from xAODMetaDataCnv.InfileMetaDataConfig import SetupMetaDataForStreamCfg
101 from AthenaConfiguration.Enums import MetadataCategory
102
103 cfg.merge(
104 SetupMetaDataForStreamCfg(
105 flags,
106 streamToMerge,
107 createMetadata=[
108 MetadataCategory.IOVMetaData,
109 ],
110 )
111 )
112
113 log.info(f'**** Configured {streamToMerge} writing')
114
115 # Configure extra bits that are needed for TP conversion
116 for item in flags.Input.TypedCollections:
117 ctype, cname = item.split('#')
118 if ctype.startswith('Trk') or ctype.startswith('InDet'):
119 from TrkEventCnvTools.TrkEventCnvToolsConfig import TrkEventCnvSuperToolCfg
120 cfg.merge(TrkEventCnvSuperToolCfg(flags))
121 if ctype.startswith('Calo') or ctype.startswith('LAr'):
122 from LArGeoAlgsNV.LArGMConfig import LArGMCfg
123 cfg.merge(LArGMCfg(flags))
124 if ctype.startswith('Calo') or ctype.startswith('Tile'):
125 from TileGeoModel.TileGMConfig import TileGMCfg
126 cfg.merge(TileGMCfg(flags))
127 if ctype.startswith('Muon'):
128 from MuonConfig.MuonGeometryConfig import MuonGeoModelCfg
129 cfg.merge(MuonGeoModelCfg(flags))
130
131 # Needed for merging in MT
132 if 'ESD' in streamToMerge:
133 Stream.ExtraInputs.add(
134 ( 'MuonGM::MuonDetectorManager',
135 'ConditionStore+MuonDetectorManager' ) )
136 Stream.ExtraInputs.add(
137 ( 'InDetDD::SiDetectorElementCollection',
138 'ConditionStore+PixelDetectorElementCollection' ) )
139 Stream.ExtraInputs.add(
140 ( 'InDetDD::SiDetectorElementCollection',
141 'ConditionStore+SCT_DetectorElementCollection' ) )
142 Stream.ExtraInputs.add(
143 ( 'InDetDD::TRT_DetElementContainer',
144 'ConditionStore+TRT_DetElementContainer' ) )
145
146 # Post-include
147 log.info('**** Processing postInclude')
148 processPostInclude(runArgs, flags, cfg)
149
150 # Post-exec
151 log.info('**** Processing postExec')
152 processPostExec(runArgs, flags, cfg)
153
154 # Now run the job and exit accordingly
155 sc = cfg.run()
156 import sys
157 sys.exit(not sc.isSuccess())