Loading [MathJax]/extensions/tex2jax.js
ATLAS Offline Software
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Functions | Variables
python.DerivationSkeleton Namespace Reference

Functions

def fromRunArgs (runArgs)
 

Variables

 jobPropertiesDisallowed
 

Function Documentation

◆ fromRunArgs()

def python.DerivationSkeleton.fromRunArgs (   runArgs)

Definition at line 16 of file DerivationSkeleton.py.

16 def fromRunArgs(runArgs):
17  from AthenaCommon.Logging import logging
18  logDerivation = logging.getLogger('Derivation')
19  logDerivation.info('****************** STARTING DERIVATION *****************')
20 
21  logDerivation.info('**** Transformation run arguments')
22  logDerivation.info(str(runArgs))
23 
24  logDerivation.info('**** Setting-up configuration flags')
25  from AthenaConfiguration.AllConfigFlags import initConfigFlags
26  flags = initConfigFlags()
27  flags.Exec.EventPrintoutInterval = 100
28  commonRunArgsToFlags(runArgs, flags)
29 
30  flags.Common.ProductionStep = ProductionStep.Derivation
31 
32  # Switch on PerfMon
33  from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
34  setPerfmonFlagsFromRunArgs(flags, runArgs)
35 
36  # Input types
37  allowedInputTypes = [ 'AOD', 'DAOD_PHYS', 'DAOD_PHYSLITE', 'EVNT' ]
38  availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in allowedInputTypes ]
39  if sum(availableInputTypes) != 1:
40  raise ValueError('Input must be exactly one of the following types: '+','.join(map(str, availableInputTypes)))
41  idx = availableInputTypes.index(True)
42  flags.Input.Files = getattr(runArgs, f'input{allowedInputTypes[idx]}File')
43 
44  # Augmentations
45  # For the time being one parent (primary stream) can have multiple children (augmentations)
46  # However, an augmentation cannot have multiple parents, which will be supported in the future
47  if hasattr(runArgs, 'augmentations'):
48  for val in runArgs.augmentations:
49  if ':' not in val or len(val.split(':')) != 2:
50  logDerivation.error('Derivation job started, but with wrong augmentation syntax - aborting')
51  raise ValueError('Invalid augmentation argument: {0}'.format(val))
52  else:
53  child, parent = val.split(':')
54  flags.addFlag(f'Output.DAOD_{child}ParentStream',f'DAOD_{parent}')
55  childStreamFlag = f'Output.DAOD_{parent}ChildStream'
56  if not flags.hasFlag(childStreamFlag):
57  flags.addFlag(childStreamFlag, [f'DAOD_{child}'])
58  else:
59  flags._set(childStreamFlag, flags._get(childStreamFlag) + [f'DAOD_{child}'])
60  logDerivation.info('Setting up event augmentation as {0} => {1}'.format(child, parent))
61 
62  # Output formats
63  formats = []
64  if hasattr(runArgs, 'formats'):
65  formats = runArgs.formats
66  logDerivation.info('Will attempt to make the following derived formats: {0}'.format(formats))
67  else:
68  logDerivation.error('Derivation job started, but with no output formats specified - aborting')
69  raise ValueError('No derived formats specified')
70 
71  # Command line skimming - limited to SKIM format and PHYS/PHYSLITE input
72  if hasattr(runArgs, 'skimmingExpression'):
73  if runArgs.skimmingExpression:
74  if not (len(formats)==1 and formats[0]=='SKIM'):
75  raise ValueError('Command-line skimming only available with SKIM format')
76  availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in [ 'DAOD_PHYS', 'DAOD_PHYSLITE' ] ]
77  if sum(availableInputTypes) != 1:
78  raise ValueError('Command-line skimming only available with input types '+','.join(map(str, availableInputTypes)))
79  flags.Derivation.skimmingExpression = runArgs.skimmingExpression
80  if not hasattr(runArgs, 'skimmingContainers'):
81  logDerivation.warning('All containers used for skimming must be listed with the skimmingContainers option - job likely to fail')
82  else:
83  flags.Derivation.dynamicConsumers = runArgs.skimmingContainers
84 
85  # Output files
86  for runArg in dir(runArgs):
87  if 'output' in runArg and 'File' in runArg and 'Type' not in runArg and 'NTUP_PHYSVAL' not in runArg:
88  outputFileName = getattr(runArgs, runArg)
89  flagString = f'Output.{runArg.removeprefix("output")}Name'
90  flags.addFlag(flagString, outputFileName)
91  flags.addFlag(f'Output.doWrite{runArg.removeprefix("output").removesuffix("File")}', True)
92  flags.Output.doWriteDAOD = True
93 
94  # Fix campaign metadata
95  from Campaigns.Utils import Campaign, campaign_runs
96  if flags.Input.isMC and flags.Input.MCCampaign is Campaign.Unknown:
97  if flags.Input.RunNumbers:
98  mc_campaign = campaign_runs.get(flags.Input.RunNumbers[0], Campaign.Unknown)
99 
100  if mc_campaign is not Campaign.Unknown:
101  flags.Input.MCCampaign = mc_campaign
102  logDerivation.info('Will recover MC campaign to: %s', mc_campaign.value)
103 
104  # Pre-include
105  processPreInclude(runArgs, flags)
106 
107  # Pre-exec
108  processPreExec(runArgs, flags)
109 
110  # To respect --athenaopts
111  flags.fillFromArgs()
112 
113  # Lock flags
114  flags.lock()
115 
116  # The D(2)AOD building configuration
117  from AthenaConfiguration.MainServicesConfig import MainServicesCfg
118  cfg = MainServicesCfg(flags)
119 
120  # Pool file reading
121  from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
122  cfg.merge(PoolReadCfg(flags))
123 
124  # Ensure proper metadata propagation for EVNT->DAOD_TRUTHx
125  if (allowedInputTypes[idx]=='EVNT'):
126  from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
127  cfg.merge(MetaDataSvcCfg(flags, ['IOVDbMetaDataTool']))
128 
129  # Further workaround for issues with overlap removal.
130  # As further explained in JetCommonConfig.AddEventCleanFlagsCfg,
131  # we can schedule multiple overlap removal algorithms which overwrite
132  # each other's decorations. To get the decorations locked, we put
133  # all the OR-related decoration algorithms in the EventCleanSeq
134  # sequence followed by decoration locking algorithms in EventCleanLockSeq.
135  # Each derivation format ensures that these sequences are in the
136  # correct place. But we can still run into trouble when multiple
137  # formats are combined. When we merge two CAs both of which
138  # contain the same sequence S, S will end up with the algorithms
139  # from both, but S will stay at its existing position in the
140  # first (destination) CA. However, for these sequences, we need
141  # them to run at the sequence's position in the second (source) CA,
142  # i.e., the later of the two positions. We accomplish this by
143  # munging the CAs before merging: remove the sequence from the
144  # destination CA and merge its algorithms to the source CA.
145  def premerge (cfg, newcfg, seqnam):
146  # Check if both CAs contain the requested sequence.
147  seq = cfg.getSequence(seqnam)
148  if not seq: return
149  newseq = newcfg.getSequence(seqnam)
150  if not newseq: return
151 
152  # Make a temporary CA with the sequence to hold algorithms
153  # removed from CFG.
154  from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
155  ca = ComponentAccumulator()
156  ca.addSequence (CompFactory.AthSequencer (seqnam, Sequential=True))
157 
158  # Remove the sequence's algorithms from CFG and add to the temp CA.
159  for a in seq.Members:
160  ca.addEventAlgo (cfg.popEventAlgo (a.getName(), seqnam), seqnam)
161 
162  # Remove the sequence itself (which should now by empty) from CFG.
163  cfg.getSequence('AthAlgSeq').Members.remove (seq)
164 
165  # Add the moved algorithms to NEWCFG.
166  newcfg.merge (ca)
167  return
168 
169  for formatName in formats:
170  derivationConfig = getattr(DerivationConfigList, f'{formatName}Cfg')
171  newcfg = derivationConfig(flags)
172  premerge (cfg, newcfg, 'EventCleanSeq')
173  premerge (cfg, newcfg, 'EventCleanLockSeq')
174  cfg.merge(newcfg)
175 
176  # Pass-through mode (ignore skimming and accept all events)
177  if hasattr(runArgs, 'passThrough'):
178  logDerivation.info('Pass-through mode was requested. All events will be written to the output.')
179  for algo in cfg.getEventAlgos():
180  if isinstance(algo, CompFactory.DerivationFramework.DerivationKernel):
181  algo.SkimmingTools = []
182 
183  # PerfMonSD
184  if flags.PerfMon.doFullMonMT or flags.PerfMon.doFastMonMT:
185  from PerfMonComps.PerfMonCompsConfig import PerfMonMTSvcCfg
186  cfg.merge(PerfMonMTSvcCfg(flags))
187 
188  # Write AMI tag into in-file metadata
189  from PyUtils.AMITagHelperConfig import AMITagCfg
190  cfg.merge(AMITagCfg(flags, runArgs, fixBroken=True))
191 
192  # Fix generator metadata
193  if flags.Input.isMC:
194  from GeneratorConfig.Versioning import GeneratorVersioningFixCfg
195  cfg.merge(GeneratorVersioningFixCfg(flags))
196 
197  # Write TagInfo
198  if not flags.Input.isMC and flags.Input.DataYear > 0:
199  from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
200  cfg.merge(TagInfoMgrCfg(flags, tagValuePairs={
201  "data_year": str(flags.Input.DataYear)
202  }))
203  else:
204  from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
205  cfg.merge(TagInfoMgrCfg(flags))
206 
207  # Post-include
208  processPostInclude(runArgs, flags, cfg)
209 
210  # Post-exec
211  processPostExec(runArgs, flags, cfg)
212 
213  # Configure components' logging levels
214  from AthenaConfiguration.Utils import setupLoggingLevels
215  setupLoggingLevels(flags, cfg)
216 
217  # Run the final configuration
218  sc = cfg.run()
219  sys.exit(not sc.isSuccess())

Variable Documentation

◆ jobPropertiesDisallowed

python.DerivationSkeleton.jobPropertiesDisallowed

Definition at line 13 of file DerivationSkeleton.py.

python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
python.PerfMonCompsConfig.PerfMonMTSvcCfg
def PerfMonMTSvcCfg(flags, **kwargs)
A minimal new-style configuration for PerfMonMTSvc.
Definition: PerfMonCompsConfig.py:10
python.TagInfoMgrConfig.TagInfoMgrCfg
def TagInfoMgrCfg(flags, tagValuePairs={})
Definition: TagInfoMgrConfig.py:6
python.Utils.setupLoggingLevels
def setupLoggingLevels(flags, ca)
Definition: Control/AthenaConfiguration/python/Utils.py:50
convertTimingResiduals.sum
sum
Definition: convertTimingResiduals.py:55
python.PerfMonConfigHelpers.setPerfmonFlagsFromRunArgs
def setPerfmonFlagsFromRunArgs(flags, runArgs)
Definition: PerfMonConfigHelpers.py:3
python.MainServicesConfig.MainServicesCfg
def MainServicesCfg(flags, LoopMgr='AthenaEventLoopMgr')
Definition: MainServicesConfig.py:260
python.DerivationSkeleton.fromRunArgs
def fromRunArgs(runArgs)
Definition: DerivationSkeleton.py:16
python.CommonRunArgsToFlags.commonRunArgsToFlags
def commonRunArgsToFlags(runArgs, configFlags)
Definition: CommonRunArgsToFlags.py:12
python.Versioning.GeneratorVersioningFixCfg
def GeneratorVersioningFixCfg(flags)
Definition: Versioning.py:72
beamspotman.dir
string dir
Definition: beamspotman.py:623
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.AMITagHelperConfig.AMITagCfg
def AMITagCfg(flags, runArgs=None, fixBroken=False)
Definition: AMITagHelperConfig.py:77
python.MetaDataSvcConfig.MetaDataSvcCfg
def MetaDataSvcCfg(flags, toolNames=[], tools=[])
Definition: MetaDataSvcConfig.py:6
python.AllConfigFlags.initConfigFlags
def initConfigFlags()
Definition: AllConfigFlags.py:19
str
Definition: BTagTrackIpAccessor.cxx:11
python.PoolReadConfig.PoolReadCfg
def PoolReadCfg(flags)
Definition: PoolReadConfig.py:69