ATLAS Offline Software
MergePool_Skeleton.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2 
3 def fromRunArgs(runArgs):
4 
5  """ This is the main skeleton for merging POOL files generically.
6  Currently it handles (D)AOD/(D)ESD files, but can be extended in the future.
7  That'll mostly entail configuring extra components needed for TP conversion (if any).
8  """
9 
10  # Setup logging
11  from AthenaCommon.Logging import logging
12  log = logging.getLogger('MergePool_Skeleton')
13  log.info('****************** STARTING MergePool MERGING *****************')
14 
15  # Print arguments
16  log.info('**** Transformation run arguments')
17  log.info(str(runArgs))
18 
19  # Setup configuration flags
20  log.info('**** Setting up configuration flags')
21 
22  from PyJobTransforms.CommonRunArgsToFlags import commonRunArgsToFlags
23  from AthenaConfiguration.AllConfigFlags import initConfigFlags
24  flags = initConfigFlags()
25  flags.Exec.EventPrintoutInterval = 100
26  commonRunArgsToFlags(runArgs, flags)
27 
28  # First let's find the input/output files
29  inputFile, outputFile = None, None
30 
31  for attr in dir(runArgs):
32  if attr.startswith('input') and attr.endswith('File'):
33  inputFile = getattr(runArgs, attr)
34  elif attr.startswith('output') and attr.endswith('File'):
35  outputFile = getattr(runArgs, attr)
36 
37  if not inputFile or not outputFile:
38  raise RuntimeError('Could NOT determine the input/output files!')
39 
40  # Now set the input files before we attempt to read the processing tags
41  flags.Input.Files = inputFile
42 
43  # Now figure out what stream type we're trying to merge
44  streamToMerge = flags.Input.ProcessingTags[0].removeprefix('Stream') if flags.Input.ProcessingTags else None
45 
46  if not streamToMerge:
47  raise RuntimeError('Could NOT determine the stream type!')
48 
49  # Now set the output file name and add additional flags
50  # For known formats, e.g., ESD, AOD, we already have the associated flags
51  # However, for other formats, e.g., DAOD_XYZ, we need to create the flags
52  try:
53  setattr(flags.Output, f'{streamToMerge}FileName', outputFile)
54  except RuntimeError: # If a flag doesn't exist CA throws a runtime error
55  flags.addFlag(f'Output.{streamToMerge}FileName', outputFile)
56  flags.addFlag(f'Output.doWrite{streamToMerge}', True)
57  if 'DAOD' in streamToMerge:
58  flags.Output.doWriteDAOD = True
59 
60  # Setup perfmon flags from runargs
61  from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
62  setPerfmonFlagsFromRunArgs(flags, runArgs)
63 
64  # Pre-include
65  from PyJobTransforms.TransformUtils import processPreExec, processPreInclude, processPostExec, processPostInclude
66  log.info('**** Processing preInclude')
67  processPreInclude(runArgs, flags)
68 
69  # Pre-exec
70  log.info('**** Processing preExec')
71  processPreExec(runArgs, flags)
72 
73  # To respect --athenaopts
74  log.info('**** Processing athenaopts')
75  flags.fillFromArgs()
76 
77  # Lock configuration flags
78  log.info('**** Locking configuration flags')
79  flags.lock()
80 
81  # Set up necessary job components
82  log.info('**** Setting up job components')
83 
84  # Main services
85  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
86  cfg = MainServicesCfg(flags)
87 
88  # Input reading
89  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
90  cfg.merge(PoolReadCfg(flags))
91 
92  # Output writing
93 
94  # Configure the output stream
95  from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg, outputStreamName
96  cfg.merge(OutputStreamCfg(flags, streamToMerge, takeItemsFromInput = True, extendProvenanceRecord = False))
97  Stream = cfg.getEventAlgo(outputStreamName(streamToMerge))
98  Stream.ForceRead = True
99  # Add in-file MetaData
100  from xAODMetaDataCnv.InfileMetaDataConfig import SetupMetaDataForStreamCfg
101  from AthenaConfiguration.Enums import MetadataCategory
102 
103  cfg.merge(
105  flags,
106  streamToMerge,
107  createMetadata=[
108  MetadataCategory.IOVMetaData,
109  ],
110  )
111  )
112 
113  log.info(f'**** Configured {streamToMerge} writing')
114 
115  # Configure extra bits that are needed for TP conversion
116  for item in flags.Input.TypedCollections:
117  ctype, cname = item.split('#')
118  if ctype.startswith('Trk') or ctype.startswith('InDet'):
119  from TrkEventCnvTools.TrkEventCnvToolsConfig import TrkEventCnvSuperToolCfg
120  cfg.merge(TrkEventCnvSuperToolCfg(flags))
121  if ctype.startswith('Calo') or ctype.startswith('LAr'):
122  from LArGeoAlgsNV.LArGMConfig import LArGMCfg
123  cfg.merge(LArGMCfg(flags))
124  if ctype.startswith('Calo') or ctype.startswith('Tile'):
125  from TileGeoModel.TileGMConfig import TileGMCfg
126  cfg.merge(TileGMCfg(flags))
127  if ctype.startswith('Muon'):
128  from MuonConfig.MuonGeometryConfig import MuonGeoModelCfg
129  cfg.merge(MuonGeoModelCfg(flags))
130 
131  # Add PerfMon
132  if flags.PerfMon.doFastMonMT or flags.PerfMon.doFullMonMT:
133  from PerfMonComps.PerfMonCompsConfig import PerfMonMTSvcCfg
134  cfg.merge(PerfMonMTSvcCfg(flags))
135 
136  # Post-include
137  log.info('**** Processing postInclude')
138  processPostInclude(runArgs, flags, cfg)
139 
140  # Post-exec
141  log.info('**** Processing postExec')
142  processPostExec(runArgs, flags, cfg)
143 
144  # Now run the job and exit accordingly
145  sc = cfg.run()
146  import sys
147  sys.exit(not sc.isSuccess())
AthenaPoolExample_WriteCond.outputStreamName
string outputStreamName
Definition: AthenaPoolExample_WriteCond.py:21
python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.OutputStreamConfig.OutputStreamCfg
def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[])
Definition: OutputStreamConfig.py:12
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
python.PerfMonCompsConfig.PerfMonMTSvcCfg
def PerfMonMTSvcCfg(flags, **kwargs)
A minimal new-style configuration for PerfMonMTSvc.
Definition: PerfMonCompsConfig.py:10
python.MergePool_Skeleton.fromRunArgs
def fromRunArgs(runArgs)
Definition: MergePool_Skeleton.py:3
python.TrkEventCnvToolsConfig.TrkEventCnvSuperToolCfg
def TrkEventCnvSuperToolCfg(flags, name='EventCnvSuperTool', **kwargs)
Definition: TrkEventCnvToolsConfig.py:51
python.PerfMonConfigHelpers.setPerfmonFlagsFromRunArgs
def setPerfmonFlagsFromRunArgs(flags, runArgs)
Definition: PerfMonConfigHelpers.py:3
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:260
python.CommonRunArgsToFlags.commonRunArgsToFlags
def commonRunArgsToFlags(runArgs, configFlags)
Definition: CommonRunArgsToFlags.py:12
LArGMConfig.LArGMCfg
def LArGMCfg(flags)
Definition: LArGMConfig.py:8
beamspotman.dir
string dir
Definition: beamspotman.py:623
python.MuonGeometryConfig.MuonGeoModelCfg
def MuonGeoModelCfg(flags)
Definition: MuonGeometryConfig.py:28
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
str
Definition: BTagTrackIpAccessor.cxx:11
InfileMetaDataConfig.SetupMetaDataForStreamCfg
def SetupMetaDataForStreamCfg(flags, streamName="", AcceptAlgs=None, createMetadata=None, propagateMetadataFromInput=True, *args, **kwargs)
Definition: InfileMetaDataConfig.py:219
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69
TileGMConfig.TileGMCfg
def TileGMCfg(flags)
Definition: TileGMConfig.py:7