ATLAS Offline Software
Loading...
Searching...
No Matches
python.DerivationSkeleton Namespace Reference

Functions

 fromRunArgs (runArgs)

Variables

 jobPropertiesDisallowed

Function Documentation

◆ fromRunArgs()

python.DerivationSkeleton.fromRunArgs ( runArgs)

Definition at line 16 of file DerivationSkeleton.py.

16def fromRunArgs(runArgs):
17 from AthenaCommon.Logging import logging
18 logDerivation = logging.getLogger('Derivation')
19 logDerivation.info('****************** STARTING DERIVATION *****************')
20
21 logDerivation.info('**** Transformation run arguments')
22 logDerivation.info(str(runArgs))
23
24 logDerivation.info('**** Setting-up configuration flags')
25 from AthenaConfiguration.AllConfigFlags import initConfigFlags
26 flags = initConfigFlags()
27 flags.Exec.EventPrintoutInterval = 100
28 commonRunArgsToFlags(runArgs, flags)
29
30 flags.Common.ProductionStep = ProductionStep.Derivation
31
32 # Switch on PerfMon
33 from PerfMonComps.PerfMonConfigHelpers import setPerfmonFlagsFromRunArgs
34 setPerfmonFlagsFromRunArgs(flags, runArgs)
35
36 # Input types
37 allowedInputTypes = [ 'AOD', 'DAOD_PHYS', 'DAOD_PHYSLITE', 'EVNT' ]
38 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in allowedInputTypes ]
39 if sum(availableInputTypes) != 1:
40 raise ValueError('Input must be exactly one of the following types: '+','.join(map(str, availableInputTypes)))
41 idx = availableInputTypes.index(True)
42 flags.Input.Files = getattr(runArgs, f'input{allowedInputTypes[idx]}File')
43
44 # Augmentations
45 # For the time being one parent (primary stream) can have multiple children (augmentations)
46 # However, an augmentation cannot have multiple parents, which will be supported in the future
47 if hasattr(runArgs, 'augmentations'):
48 for val in runArgs.augmentations:
49 if ':' not in val or len(val.split(':')) != 2:
50 logDerivation.error('Derivation job started, but with wrong augmentation syntax - aborting')
51 raise ValueError('Invalid augmentation argument: {0}'.format(val))
52 else:
53 child, parent = val.split(':')
54 flags.addFlag(f'Output.DAOD_{child}ParentStream',f'DAOD_{parent}')
55 childStreamFlag = f'Output.DAOD_{parent}ChildStream'
56 if not flags.hasFlag(childStreamFlag):
57 flags.addFlag(childStreamFlag, [f'DAOD_{child}'])
58 else:
59 flags._set(childStreamFlag, flags._get(childStreamFlag) + [f'DAOD_{child}'])
60 logDerivation.info('Setting up event augmentation as {0} => {1}'.format(child, parent))
61
62 # Output formats
63 formats = []
64 if hasattr(runArgs, 'formats'):
65 formats = runArgs.formats
66 logDerivation.info('Will attempt to make the following derived formats: {0}'.format(formats))
67 else:
68 logDerivation.error('Derivation job started, but with no output formats specified - aborting')
69 raise ValueError('No derived formats specified')
70
71 # Command line skimming - limited to SKIM format and PHYS/PHYSLITE input
72 if hasattr(runArgs, 'skimmingExpression'):
73 if runArgs.skimmingExpression:
74 if not (len(formats)==1 and formats[0]=='SKIM'):
75 raise ValueError('Command-line skimming only available with SKIM format')
76 availableInputTypes = [ hasattr(runArgs, f'input{inputType}File') for inputType in [ 'DAOD_PHYS', 'DAOD_PHYSLITE' ] ]
77 if sum(availableInputTypes) != 1:
78 raise ValueError('Command-line skimming only available with input types '+','.join(map(str, availableInputTypes)))
79 flags.Derivation.skimmingExpression = runArgs.skimmingExpression
80 if not hasattr(runArgs, 'skimmingContainers'):
81 logDerivation.warning('All containers used for skimming must be listed with the skimmingContainers option - job likely to fail')
82 else:
83 flags.Derivation.dynamicConsumers = runArgs.skimmingContainers
84
85 # Output files
86 for runArg in dir(runArgs):
87 if 'output' in runArg and 'File' in runArg and 'Type' not in runArg and 'NTUP_PHYSVAL' not in runArg:
88 outputFileName = getattr(runArgs, runArg)
89 flagString = f'Output.{runArg.removeprefix("output")}Name'
90 flags.addFlag(flagString, outputFileName)
91 flags.addFlag(f'Output.doWrite{runArg.removeprefix("output").removesuffix("File")}', True)
92 flags.Output.doWriteDAOD = True
93
94 # Fix campaign metadata
95 from Campaigns.Utils import Campaign, campaign_runs
96 if flags.Input.isMC and flags.Input.MCCampaign is Campaign.Unknown:
97 if flags.Input.RunNumbers:
98 mc_campaign = campaign_runs.get(flags.Input.RunNumbers[0], Campaign.Unknown)
99
100 if mc_campaign is not Campaign.Unknown:
101 flags.Input.MCCampaign = mc_campaign
102 logDerivation.info('Will recover MC campaign to: %s', mc_campaign.value)
103
104 # Fix MC channel number with EVNT files
105 if allowedInputTypes[idx] == 'EVNT':
106 # runNumber is MC channel number in reco
107 if hasattr(runArgs, 'runNumber') and flags.Input.MCChannelNumber != runArgs.runNumber:
108 logDerivation.warning('Got different MC channel number (%d) from runNumber than from metadata (%d)', runArgs.runNumber, flags.Input.MCChannelNumber)
109 flags.Input.MCChannelNumber = runArgs.runNumber
110 elif flags.Input.MCChannelNumber == 0 and flags.Input.RunNumbers and flags.Input.RunNumbers[0] != 0:
111 logDerivation.info('Will recover MC channel number to: %d', flags.Input.RunNumbers[0])
112 flags.Input.MCChannelNumber = flags.Input.RunNumbers[0]
113 else:
114 logDerivation.info('MC channel number: %d', flags.Input.MCChannelNumber)
115
116 # Pre-include
117 processPreInclude(runArgs, flags)
118
119 # Pre-exec
120 processPreExec(runArgs, flags)
121
122 # To respect --athenaopts
123 flags.fillFromArgs()
124
125 # Lock flags
126 flags.lock()
127
128 # The D(2)AOD building configuration
129 from AthenaConfiguration.MainServicesConfig import MainServicesCfg
130 cfg = MainServicesCfg(flags)
131
132 # Pool file reading
133 from AthenaPoolCnvSvc.PoolReadConfig import PoolReadCfg
134 cfg.merge(PoolReadCfg(flags))
135
136 # Ensure proper metadata propagation for EVNT->DAOD_TRUTHx
137 if (allowedInputTypes[idx]=='EVNT'):
138 from AthenaServices.MetaDataSvcConfig import MetaDataSvcCfg
139 cfg.merge(MetaDataSvcCfg(flags, ['IOVDbMetaDataTool']))
140
141 # Further workaround for issues with overlap removal.
142 # As further explained in JetCommonConfig.AddEventCleanFlagsCfg,
143 # we can schedule multiple overlap removal algorithms which overwrite
144 # each other's decorations. To get the decorations locked, we put
145 # all the OR-related decoration algorithms in the EventCleanSeq
146 # sequence followed by decoration locking algorithms in EventCleanLockSeq.
147 # Each derivation format ensures that these sequences are in the
148 # correct place. But we can still run into trouble when multiple
149 # formats are combined. When we merge two CAs both of which
150 # contain the same sequence S, S will end up with the algorithms
151 # from both, but S will stay at its existing position in the
152 # first (destination) CA. However, for these sequences, we need
153 # them to run at the sequence's position in the second (source) CA,
154 # i.e., the later of the two positions. We accomplish this by
155 # munging the CAs before merging: remove the sequence from the
156 # destination CA and merge its algorithms to the source CA.
157 def premerge (cfg, newcfg, seqnam):
158 # Check if both CAs contain the requested sequence.
159 seq = cfg.getSequence(seqnam)
160 if not seq: return
161 newseq = newcfg.getSequence(seqnam)
162 if not newseq: return
163
164 # Make a temporary CA with the sequence to hold algorithms
165 # removed from CFG.
166 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
167 ca = ComponentAccumulator()
168 ca.addSequence (CompFactory.AthSequencer (seqnam, Sequential=True))
169
170 # Remove the sequence's algorithms from CFG and add to the temp CA.
171 for a in seq.Members:
172 ca.addEventAlgo (cfg.popEventAlgo (a.getName(), seqnam), seqnam)
173
174 # Remove the sequence itself (which should now by empty) from CFG.
175 cfg.getSequence('AthAlgSeq').Members.remove (seq)
176
177 # Add the moved algorithms to NEWCFG.
178 newcfg.merge (ca)
179 return
180
181 for formatName in formats:
182 derivationConfig = getattr(DerivationConfigList, f'{formatName}Cfg')
183 newcfg = derivationConfig(flags)
184 premerge (cfg, newcfg, 'EventCleanSeq')
185 premerge (cfg, newcfg, 'EventCleanLockSeq')
186 cfg.merge(newcfg)
187
188 # Pass-through mode (ignore skimming and accept all events)
189 if hasattr(runArgs, 'passThrough'):
190 logDerivation.info('Pass-through mode was requested. All events will be written to the output.')
191 for algo in cfg.getEventAlgos():
192 if isinstance(algo, CompFactory.DerivationFramework.DerivationKernel):
193 algo.SkimmingTools = []
194
195 # Write AMI tag into in-file metadata
196 from PyUtils.AMITagHelperConfig import AMITagCfg
197 cfg.merge(AMITagCfg(flags, runArgs, fixBroken=True))
198
199 # Fix generator metadata
200 if flags.Input.isMC:
201 from GeneratorConfig.Versioning import GeneratorVersioningFixCfg
202 cfg.merge(GeneratorVersioningFixCfg(flags))
203
204 # Write TagInfo
205 if not flags.Input.isMC and flags.Input.DataYear > 0:
206 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
207 cfg.merge(TagInfoMgrCfg(flags, tagValuePairs={
208 "data_year": str(flags.Input.DataYear)
209 }))
210 else:
211 from EventInfoMgt.TagInfoMgrConfig import TagInfoMgrCfg
212 cfg.merge(TagInfoMgrCfg(flags))
213
214 # Post-include
215 processPostInclude(runArgs, flags, cfg)
216
217 # Post-exec
218 processPostExec(runArgs, flags, cfg)
219
220 # Configure components' logging levels
221 from AthenaConfiguration.Utils import setupLoggingLevels
222 setupLoggingLevels(flags, cfg)
223
224 # Run the final configuration
225 sc = cfg.run()
226 sys.exit(not sc.isSuccess())
STL class.

Variable Documentation

◆ jobPropertiesDisallowed

python.DerivationSkeleton.jobPropertiesDisallowed

Definition at line 13 of file DerivationSkeleton.py.