ATLAS Offline Software
DerivationSkeleton.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 import sys
4 
5 from AthenaConfiguration.ComponentFactory import CompFactory
6 from AthenaConfiguration.Enums import ProductionStep
7 from DerivationFrameworkConfiguration import DerivationConfigList
8 from PyJobTransforms.CommonRunArgsToFlags import commonRunArgsToFlags
9 from PyJobTransforms.TransformUtils import processPreExec, processPreInclude, processPostExec, processPostInclude
10 
11 # force no legacy job properties
12 from AthenaCommon import JobProperties
13 JobProperties.jobPropertiesDisallowed = True
14 
15 
16 def fromRunArgs(runArgs):
17  from AthenaCommon.Logging import logging
18  logDerivation = logging.getLogger('Derivation')
19  logDerivation.info('****************** STARTING DERIVATION *****************')
20 
21  logDerivation.info('**** Transformation run arguments')
22  logDerivation.info(str(runArgs))
23 
24  logDerivation.info('**** Setting-up configuration flags')
25  from AthenaConfiguration.AllConfigFlags import initConfigFlags
26  flags = initConfigFlags()
27  flags.Exec.EventPrintoutInterval = 100
28  commonRunArgsToFlags(runArgs, flags)
29 
30  flags.Common.ProductionStep = ProductionStep.Derivation
31 
32  # Switch on PerfMon
33  from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
34  setPerfmonFlagsFromRunArgs(flags, runArgs)
35 
36  # Input types
37  allowedInputTypes = [ 'AOD', 'DAOD_PHYS', 'EVNT' ]
38  availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in allowedInputTypes ]
39  if sum(availableInputTypes) != 1:
40  raise ValueError('Input must be exactly one of the following types: inputAODFile, inputEVNTFile, inputDAOD_PHYSFile')
41  idx = availableInputTypes.index(True)
42  flags.Input.Files = getattr(runArgs, f'input{allowedInputTypes[idx]}File')
43 
44  # Augmentations
45  # For the time being one parent (primary stream) can have multiple children (augmentations)
46  # However, an augmentation cannot have multiple parents, which will be supported in the future
47  if hasattr(runArgs, 'augmentations'):
48  for val in runArgs.augmentations:
49  if ':' not in val or len(val.split(':')) != 2:
50  logDerivation.error('Derivation job started, but with wrong augmentation syntax - aborting')
51  raise ValueError('Invalid augmentation argument: {0}'.format(val))
52  else:
53  child, parent = val.split(':')
54  flags.addFlag(f'Output.DAOD_{child}ParentStream',f'DAOD_{parent}')
55  childStreamFlag = f'Output.DAOD_{parent}ChildStream'
56  if not flags.hasFlag(childStreamFlag):
57  flags.addFlag(childStreamFlag, [f'DAOD_{child}'])
58  else:
59  flags._set(childStreamFlag, flags._get(childStreamFlag) + [f'DAOD_{child}'])
60  logDerivation.info('Setting up event augmentation as {0} => {1}'.format(child, parent))
61 
62  # Output formats
63  formats = []
64  if hasattr(runArgs, 'formats'):
65  formats = runArgs.formats
66  logDerivation.info('Will attempt to make the following derived formats: {0}'.format(formats))
67  else:
68  logDerivation.error('Derivation job started, but with no output formats specified - aborting')
69  raise ValueError('No derived formats specified')
70 
71  # Output files
72  for runArg in dir(runArgs):
73  if 'output' in runArg and 'File' in runArg and 'Type' not in runArg and 'NTUP_PHYSVAL' not in runArg:
74  outputFileName = getattr(runArgs, runArg)
75  flagString = f'Output.{runArg.removeprefix("output")}Name'
76  flags.addFlag(flagString, outputFileName)
77  flags.addFlag(f'Output.doWrite{runArg.removeprefix("output").removesuffix("File")}', True)
78  flags.Output.doWriteDAOD = True
79 
80  # Fix campaign metadata
81  from Campaigns.Utils import Campaign, campaign_runs
82  if flags.Input.isMC and flags.Input.MCCampaign is Campaign.Unknown:
83  if flags.Input.RunNumbers:
84  mc_campaign = campaign_runs.get(flags.Input.RunNumbers[0], Campaign.Unknown)
85 
86  if mc_campaign is not Campaign.Unknown:
87  flags.Input.MCCampaign = mc_campaign
88  logDerivation.info('Will recover MC campaign to: %s', mc_campaign.value)
89 
90  # Pre-include
91  processPreInclude(runArgs, flags)
92 
93  # Pre-exec
94  processPreExec(runArgs, flags)
95 
96  # To respect --athenaopts
97  flags.fillFromArgs()
98 
99  # Lock flags
100  flags.lock()
101 
102  # The D(2)AOD building configuration
103  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
104  cfg = MainServicesCfg(flags)
105 
106  # Pool file reading
107  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
108  cfg.merge(PoolReadCfg(flags))
109 
110  # Ensure proper metadata propagation for EVNT->DAOD_TRUTHx
111  if (allowedInputTypes[idx]=='EVNT'):
112  from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
113  cfg.merge(MetaDataSvcCfg(flags, ['IOVDbMetaDataTool']))
114 
115  for formatName in formats:
116  derivationConfig = getattr(DerivationConfigList, f'{formatName}Cfg')
117  cfg.merge(derivationConfig(flags))
118 
119  # Pass-through mode (ignore skimming and accept all events)
120  if hasattr(runArgs, 'passThrough'):
121  logDerivation.info('Pass-through mode was requested. All events will be written to the output.')
122  for algo in cfg.getEventAlgos():
123  if isinstance(algo, CompFactory.DerivationFramework.DerivationKernel):
124  algo.SkimmingTools = []
125 
126  # PerfMonSD
127  if flags.PerfMon.doFullMonMT or flags.PerfMon.doFastMonMT:
128  from PerfMonComps.PerfMonCompsConfig import PerfMonMTSvcCfg
129  cfg.merge(PerfMonMTSvcCfg(flags))
130 
131  # Write AMI tag into in-file metadata
132  from PyUtils.AMITagHelperConfig import AMITagCfg
133  cfg.merge(AMITagCfg(flags, runArgs, fixBroken=True))
134 
135  # Fix generator metadata
136  if flags.Input.isMC:
137  from GeneratorConfig.Versioning import GeneratorVersioningFixCfg
138  cfg.merge(GeneratorVersioningFixCfg(flags))
139 
140  # Write TagInfo
141  if not flags.Input.isMC and flags.Input.DataYear > 0:
142  from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
143  cfg.merge(TagInfoMgrCfg(flags, tagValuePairs={
144  "data_year": str(flags.Input.DataYear)
145  }))
146  else:
147  from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
148  cfg.merge(TagInfoMgrCfg(flags))
149 
150  # Post-include
151  processPostInclude(runArgs, flags, cfg)
152 
153  # Post-exec
154  processPostExec(runArgs, flags, cfg)
155 
156  # Configure components' logging levels
157  from AthenaConfiguration.Utils import setupLoggingLevels
158  setupLoggingLevels(flags, cfg)
159 
160  # Run the final configuration
161  sc = cfg.run()
162  sys.exit(not sc.isSuccess())
python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
python.PerfMonCompsConfig.PerfMonMTSvcCfg
def PerfMonMTSvcCfg(flags, **kwargs)
A minimal new-style configuration for PerfMonMTSvc.
Definition: PerfMonCompsConfig.py:10
python.TagInfoMgrConfig.TagInfoMgrCfg
def TagInfoMgrCfg(flags, tagValuePairs={})
Definition: TagInfoMgrConfig.py:6
python.Utils.setupLoggingLevels
def setupLoggingLevels(flags, ca)
Definition: Control/AthenaConfiguration/python/Utils.py:46
convertTimingResiduals.sum
sum
Definition: convertTimingResiduals.py:55
python.PerfMonConfigHelpers.setPerfmonFlagsFromRunArgs
def setPerfmonFlagsFromRunArgs(flags, runArgs)
Definition: PerfMonConfigHelpers.py:3
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:260
python.DerivationSkeleton.fromRunArgs
def fromRunArgs(runArgs)
Definition: DerivationSkeleton.py:16
python.CommonRunArgsToFlags.commonRunArgsToFlags
def commonRunArgsToFlags(runArgs, configFlags)
Definition: CommonRunArgsToFlags.py:12
python.Versioning.GeneratorVersioningFixCfg
def GeneratorVersioningFixCfg(flags)
Definition: Versioning.py:57
beamspotman.dir
string dir
Definition: beamspotman.py:623
python.AMITagHelperConfig.AMITagCfg
def AMITagCfg(flags, runArgs=None, fixBroken=False)
Definition: AMITagHelperConfig.py:77
python.MetaDataSvcConfig.MetaDataSvcCfg
def MetaDataSvcCfg(flags, toolNames=[], tools=[])
Definition: MetaDataSvcConfig.py:6
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
str
Definition: BTagTrackIpAccessor.cxx:11
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69