ATLAS Offline Software
Loading...
Searching...
No Matches
python.DerivationSkeleton Namespace Reference

Functions

 fromRunArgs (runArgs)

Variables

 jobPropertiesDisallowed

Function Documentation

◆ fromRunArgs()

python.DerivationSkeleton.fromRunArgs ( runArgs)

Definition at line 16 of file DerivationSkeleton.py.

16def fromRunArgs(runArgs):
17 from AthenaCommon.Logging import logging
18 logDerivation = logging.getLogger('Derivation')
19 logDerivation.info('****************** STARTING DERIVATION *****************')
20
21 logDerivation.info('**** Transformation run arguments')
22 logDerivation.info(str(runArgs))
23
24 logDerivation.info('**** Setting-up configuration flags')
25 from AthenaConfiguration.AllConfigFlags import initConfigFlags
26 flags = initConfigFlags()
27 flags.Exec.EventPrintoutInterval = 100
28 commonRunArgsToFlags(runArgs, flags)
29
30 flags.Common.ProductionStep = ProductionStep.Derivation
31
32 # Switch on PerfMon
33 from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
34 setPerfmonFlagsFromRunArgs(flags, runArgs)
35
36 # Input types
37 allowedInputTypes = [ 'AOD', 'DAOD_PHYS', 'DAOD_PHYSLITE', 'EVNT' ]
38 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in allowedInputTypes ]
39 if sum(availableInputTypes) != 1:
40 raise ValueError('Input must be exactly one of the following types: '+','.join(map(str, availableInputTypes)))
41 idx = availableInputTypes.index(True)
42 flags.Input.Files = getattr(runArgs, f'input{allowedInputTypes[idx]}File')
43
44 # Augmentations
45 # For the time being one parent (primary stream) can have multiple children (augmentations)
46 # However, an augmentation cannot have multiple parents, which will be supported in the future
47 if hasattr(runArgs, 'augmentations'):
48 for val in runArgs.augmentations:
49 if ':' not in val or len(val.split(':')) != 2:
50 logDerivation.error('Derivation job started, but with wrong augmentation syntax - aborting')
51 raise ValueError('Invalid augmentation argument: {0}'.format(val))
52 else:
53 child, parent = val.split(':')
54 flags.addFlag(f'Output.DAOD_{child}ParentStream',f'DAOD_{parent}')
55 childStreamFlag = f'Output.DAOD_{parent}ChildStream'
56 if not flags.hasFlag(childStreamFlag):
57 flags.addFlag(childStreamFlag, [f'DAOD_{child}'])
58 else:
59 flags._set(childStreamFlag, flags._get(childStreamFlag) + [f'DAOD_{child}'])
60 logDerivation.info('Setting up event augmentation as {0} => {1}'.format(child, parent))
61
62 # Output formats
63 formats = []
64 if hasattr(runArgs, 'formats'):
65 formats = runArgs.formats
66 logDerivation.info('Will attempt to make the following derived formats: {0}'.format(formats))
67 else:
68 logDerivation.error('Derivation job started, but with no output formats specified - aborting')
69 raise ValueError('No derived formats specified')
70
71 # Command line skimming - limited to SKIM format and PHYS/PHYSLITE input
72 if hasattr(runArgs, 'skimmingExpression'):
73 if runArgs.skimmingExpression:
74 if not (len(formats)==1 and formats[0]=='SKIM'):
75 raise ValueError('Command-line skimming only available with SKIM format')
76 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in [ 'DAOD_PHYS', 'DAOD_PHYSLITE' ] ]
77 if sum(availableInputTypes) != 1:
78 raise ValueError('Command-line skimming only available with input types '+','.join(map(str, availableInputTypes)))
79 flags.Derivation.skimmingExpression = runArgs.skimmingExpression
80 if not hasattr(runArgs, 'skimmingContainers'):
81 logDerivation.warning('All containers used for skimming must be listed with the skimmingContainers option - job likely to fail')
82 else:
83 flags.Derivation.dynamicConsumers = runArgs.skimmingContainers
84
85 # Output files
86 for runArg in dir(runArgs):
87 if 'output' in runArg and 'File' in runArg and 'Type' not in runArg and 'NTUP_PHYSVAL' not in runArg:
88 outputFileName = getattr(runArgs, runArg)
89 flagString = f'Output.{runArg.removeprefix("output")}Name'
90 flags.addFlag(flagString, outputFileName)
91 flags.addFlag(f'Output.doWrite{runArg.removeprefix("output").removesuffix("File")}', True)
92 flags.Output.doWriteDAOD = True
93
94 # Fix campaign metadata
95 from Campaigns.Utils import Campaign, campaign_runs
96 if flags.Input.isMC and flags.Input.MCCampaign is Campaign.Unknown:
97 if flags.Input.RunNumbers:
98 mc_campaign = campaign_runs.get(flags.Input.RunNumbers[0], Campaign.Unknown)
99
100 if mc_campaign is not Campaign.Unknown:
101 flags.Input.MCCampaign = mc_campaign
102 logDerivation.info('Will recover MC campaign to: %s', mc_campaign.value)
103
104 # Pre-include
105 processPreInclude(runArgs, flags)
106
107 # Pre-exec
108 processPreExec(runArgs, flags)
109
110 # To respect --athenaopts
111 flags.fillFromArgs()
112
113 # Lock flags
114 flags.lock()
115
116 # The D(2)AOD building configuration
117 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
118 cfg = MainServicesCfg(flags)
119
120 # Pool file reading
121 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
122 cfg.merge(PoolReadCfg(flags))
123
124 # Ensure proper metadata propagation for EVNT->DAOD_TRUTHx
125 if (allowedInputTypes[idx]=='EVNT'):
126 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
127 cfg.merge(MetaDataSvcCfg(flags, ['IOVDbMetaDataTool']))
128
129 # Further workaround for issues with overlap removal.
130 # As further explained in JetCommonConfig.AddEventCleanFlagsCfg,
131 # we can schedule multiple overlap removal algorithms which overwrite
132 # each other's decorations. To get the decorations locked, we put
133 # all the OR-related decoration algorithms in the EventCleanSeq
134 # sequence followed by decoration locking algorithms in EventCleanLockSeq.
135 # Each derivation format ensures that these sequences are in the
136 # correct place. But we can still run into trouble when multiple
137 # formats are combined. When we merge two CAs both of which
138 # contain the same sequence S, S will end up with the algorithms
139 # from both, but S will stay at its existing position in the
140 # first (destination) CA. However, for these sequences, we need
141 # them to run at the sequence's position in the second (source) CA,
142 # i.e., the later of the two positions. We accomplish this by
143 # munging the CAs before merging: remove the sequence from the
144 # destination CA and merge its algorithms to the source CA.
145 def premerge (cfg, newcfg, seqnam):
146 # Check if both CAs contain the requested sequence.
147 seq = cfg.getSequence(seqnam)
148 if not seq: return
149 newseq = newcfg.getSequence(seqnam)
150 if not newseq: return
151
152 # Make a temporary CA with the sequence to hold algorithms
153 # removed from CFG.
154 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
155 ca = ComponentAccumulator()
156 ca.addSequence (CompFactory.AthSequencer (seqnam, Sequential=True))
157
158 # Remove the sequence's algorithms from CFG and add to the temp CA.
159 for a in seq.Members:
160 ca.addEventAlgo (cfg.popEventAlgo (a.getName(), seqnam), seqnam)
161
162 # Remove the sequence itself (which should now by empty) from CFG.
163 cfg.getSequence('AthAlgSeq').Members.remove (seq)
164
165 # Add the moved algorithms to NEWCFG.
166 newcfg.merge (ca)
167 return
168
169 for formatName in formats:
170 derivationConfig = getattr(DerivationConfigList, f'{formatName}Cfg')
171 newcfg = derivationConfig(flags)
172 premerge (cfg, newcfg, 'EventCleanSeq')
173 premerge (cfg, newcfg, 'EventCleanLockSeq')
174 cfg.merge(newcfg)
175
176 # Pass-through mode (ignore skimming and accept all events)
177 if hasattr(runArgs, 'passThrough'):
178 logDerivation.info('Pass-through mode was requested. All events will be written to the output.')
179 for algo in cfg.getEventAlgos():
180 if isinstance(algo, CompFactory.DerivationFramework.DerivationKernel):
181 algo.SkimmingTools = []
182
183 # Write AMI tag into in-file metadata
184 from PyUtils.AMITagHelperConfig import AMITagCfg
185 cfg.merge(AMITagCfg(flags, runArgs, fixBroken=True))
186
187 # Fix generator metadata
188 if flags.Input.isMC:
189 from GeneratorConfig.Versioning import GeneratorVersioningFixCfg
190 cfg.merge(GeneratorVersioningFixCfg(flags))
191
192 # Write TagInfo
193 if not flags.Input.isMC and flags.Input.DataYear > 0:
194 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
195 cfg.merge(TagInfoMgrCfg(flags, tagValuePairs={
196 "data_year": str(flags.Input.DataYear)
197 }))
198 else:
199 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
200 cfg.merge(TagInfoMgrCfg(flags))
201
202 # Post-include
203 processPostInclude(runArgs, flags, cfg)
204
205 # Post-exec
206 processPostExec(runArgs, flags, cfg)
207
208 # Configure components' logging levels
209 from AthenaConfiguration.Utils import setupLoggingLevels
210 setupLoggingLevels(flags, cfg)
211
212 # Run the final configuration
213 sc = cfg.run()
214 sys.exit(not sc.isSuccess())
STL class.

Variable Documentation

◆ jobPropertiesDisallowed

python.DerivationSkeleton.jobPropertiesDisallowed

Definition at line 13 of file DerivationSkeleton.py.