ATLAS Offline Software
MergePool_Skeleton.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
2 
3 def fromRunArgs(runArgs):
4 
5  """ This is the main skeleton for merging POOL files generically.
6  Currently it handles (D)AOD/(D)ESD files, but can be extended in the future.
7  That'll mostly entail configuring extra components needed for TP conversion (if any).
8  """
9 
10  # Setup logging
11  from AthenaCommon.Logging import logging
12  log = logging.getLogger('MergePool_Skeleton')
13  log.info('****************** STARTING MergePool MERGING *****************')
14 
15  # Print arguments
16  log.info('**** Transformation run arguments')
17  log.info(str(runArgs))
18 
19  # Setup configuration flags
20  log.info('**** Setting up configuration flags')
21 
22  from PyJobTransforms.CommonRunArgsToFlags import commonRunArgsToFlags
23  from AthenaConfiguration.AllConfigFlags import initConfigFlags
24  flags = initConfigFlags()
25  flags.Exec.EventPrintoutInterval = 100
26  commonRunArgsToFlags(runArgs, flags)
27 
28  # First let's find the input/output files
29  inputFile, outputFile = None, None
30 
31  for attr in dir(runArgs):
32  if attr.startswith('input') and attr.endswith('File'):
33  inputFile = getattr(runArgs, attr)
34  elif attr.startswith('output') and attr.endswith('File'):
35  outputFile = getattr(runArgs, attr)
36 
37  if not inputFile or not outputFile:
38  raise RuntimeError('Could NOT determine the input/output files!')
39 
40  # Now set the input files before we attempt to read the processing tags
41  flags.Input.Files = inputFile
42 
43  # Now figure out what stream type we're trying to merge
44  streamToMerge = flags.Input.ProcessingTags[0].removeprefix('Stream') if flags.Input.ProcessingTags else None
45 
46  if not streamToMerge:
47  raise RuntimeError('Could NOT determine the stream type!')
48 
49  # Now set the output file name and add additional flags
50  # that are necessary for the derived formats
51  if 'DAOD' in streamToMerge or 'DESD' in streamToMerge:
52  flags.addFlag(f'Output.{streamToMerge}FileName', outputFile)
53  flags.addFlag(f'Output.doWrite{streamToMerge}', True)
54  if 'DAOD' in streamToMerge:
55  flags.Output.doWriteDAOD = True
56  else:
57  setattr(flags.Output, f'{streamToMerge}FileName', outputFile)
58 
59  # Setup perfmon flags from runargs
60  from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
61  setPerfmonFlagsFromRunArgs(flags, runArgs)
62 
63  # Pre-include
64  from PyJobTransforms.TransformUtils import processPreExec, processPreInclude, processPostExec, processPostInclude
65  log.info('**** Processing preInclude')
66  processPreInclude(runArgs, flags)
67 
68  # Pre-exec
69  log.info('**** Processing preExec')
70  processPreExec(runArgs, flags)
71 
72  # To respect --athenaopts
73  log.info('**** Processing athenaopts')
74  flags.fillFromArgs()
75 
76  # Lock configuration flags
77  log.info('**** Locking configuration flags')
78  flags.lock()
79 
80  # Set up necessary job components
81  log.info('**** Setting up job components')
82 
83  # Main services
84  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
85  cfg = MainServicesCfg(flags)
86 
87  # Input reading
88  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
89  cfg.merge(PoolReadCfg(flags))
90 
91  # Output writing
92 
93  # Configure the output stream
94  from OutputStreamAthenaPool.OutputStreamConfig import OutputStreamCfg, outputStreamName
95  cfg.merge(OutputStreamCfg(flags, streamToMerge, takeItemsFromInput = True, extendProvenanceRecord = False))
96  Stream = cfg.getEventAlgo(outputStreamName(streamToMerge))
97  Stream.ForceRead = True
98  # Add in-file MetaData
99  from xAODMetaDataCnv.InfileMetaDataConfig import SetupMetaDataForStreamCfg
100  cfg.merge(
102  flags,
103  streamToMerge,
104  )
105  )
106 
107  log.info(f'**** Configured {streamToMerge} writing')
108 
109  # Configure extra bits that are needed for TP conversion
110  for item in flags.Input.TypedCollections:
111  ctype, cname = item.split('#')
112  if ctype.startswith('Trk'):
113  from TrkEventCnvTools.TrkEventCnvToolsConfig import TrkEventCnvSuperToolCfg
114  cfg.merge(TrkEventCnvSuperToolCfg(flags))
115  if ctype.startswith('Calo') or ctype.startswith('LAr'):
116  from LArGeoAlgsNV.LArGMConfig import LArGMCfg
117  cfg.merge(LArGMCfg(flags))
118  if ctype.startswith('Calo') or ctype.startswith('Tile'):
119  from TileGeoModel.TileGMConfig import TileGMCfg
120  cfg.merge(TileGMCfg(flags))
121  if ctype.startswith('Muon'):
122  from MuonConfig.MuonGeometryConfig import MuonGeoModelCfg
123  cfg.merge(MuonGeoModelCfg(flags))
124 
125  # Add PerfMon
126  if flags.PerfMon.doFastMonMT or flags.PerfMon.doFullMonMT:
127  from PerfMonComps.PerfMonCompsConfig import PerfMonMTSvcCfg
128  cfg.merge(PerfMonMTSvcCfg(flags))
129 
130  # Post-include
131  log.info('**** Processing postInclude')
132  processPostInclude(runArgs, flags, cfg)
133 
134  # Post-exec
135  log.info('**** Processing postExec')
136  processPostExec(runArgs, flags, cfg)
137 
138  # Now run the job and exit accordingly
139  sc = cfg.run()
140  import sys
141  sys.exit(not sc.isSuccess())
AthenaPoolExample_WriteCond.outputStreamName
string outputStreamName
Definition: AthenaPoolExample_WriteCond.py:21
python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.OutputStreamConfig.OutputStreamCfg
def OutputStreamCfg(flags, streamName, ItemList=[], MetadataItemList=[], disableEventTag=False, trigNavThinningSvc=None, takeItemsFromInput=False, extendProvenanceRecord=True, AcceptAlgs=[], HelperTools=[])
Definition: OutputStreamConfig.py:12
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
python.PerfMonCompsConfig.PerfMonMTSvcCfg
def PerfMonMTSvcCfg(flags, **kwargs)
A minimal new-style configuration for PerfMonMTSvc.
Definition: PerfMonCompsConfig.py:10
python.MergePool_Skeleton.fromRunArgs
def fromRunArgs(runArgs)
Definition: MergePool_Skeleton.py:3
python.TrkEventCnvToolsConfig.TrkEventCnvSuperToolCfg
def TrkEventCnvSuperToolCfg(flags, name='EventCnvSuperTool', **kwargs)
Definition: TrkEventCnvToolsConfig.py:51
python.PerfMonConfigHelpers.setPerfmonFlagsFromRunArgs
def setPerfmonFlagsFromRunArgs(flags, runArgs)
Definition: PerfMonConfigHelpers.py:3
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:256
python.CommonRunArgsToFlags.commonRunArgsToFlags
def commonRunArgsToFlags(runArgs, configFlags)
Definition: CommonRunArgsToFlags.py:12
LArGMConfig.LArGMCfg
def LArGMCfg(flags)
Definition: LArGMConfig.py:8
beamspotman.dir
string dir
Definition: beamspotman.py:623
python.MuonGeometryConfig.MuonGeoModelCfg
def MuonGeoModelCfg(flags)
Definition: MuonGeometryConfig.py:28
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
str
Definition: BTagTrackIpAccessor.cxx:11
InfileMetaDataConfig.SetupMetaDataForStreamCfg
def SetupMetaDataForStreamCfg(flags, streamName="", AcceptAlgs=None, createMetadata=None, propagateMetadataFromInput=True, *args, **kwargs)
Definition: InfileMetaDataConfig.py:216
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69
TileGMConfig.TileGMCfg
def TileGMCfg(flags)
Definition: TileGMConfig.py:7